Pub/Sub to Avro Files on Cloud Storage template

The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub topic and writes Avro files into the specified Cloud Storage bucket.

Pipeline requirements

  • The input Pub/Sub topic must exist prior to pipeline execution.

Template parameters

Required parameters

  • inputTopic: The Pub/Sub topic to subscribe to for message consumption. The topic name must be in the format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputDirectory: The output directory where output Avro files are archived. Must contain / at the end. For example: gs://example-bucket/example-directory/.
  • avroTempDirectory: The directory for temporary Avro files. Must contain / at the end. For example: gs://example-bucket/example-directory/.

Optional parameters

  • outputFilenamePrefix: The output filename prefix for the Avro files. Defaults to: output.
  • outputFilenameSuffix: The output filename suffix for the Avro files. Defaults to empty.
  • outputShardTemplate: The shard template defines the dynamic portion of each windowed file. By default, the pipeline uses a single shard for output to the file system within each window. Therefore, all data outputs into a single file per window. The outputShardTemplate defaults to W-P-SS-of-NN, where W is the window date range, P is the pane info, S is the shard number, and N is the number of shards. In case of a single file, the SS-of-NN portion of the outputShardTemplate is 00-of-01.
  • yearPattern: Pattern for formatting the year. Must be one or more of y or Y. Case makes no difference in the year. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory (/) character. Defaults to YYYY.
  • monthPattern: Pattern for formatting the month. Must be one or more of the M character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory (/) character. Defaults to MM.
  • dayPattern: Pattern for formatting the day. Must be one or more of d for day of month or D for day of year. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory (/) character. Defaults to dd.
  • hourPattern: Pattern for formatting the hour. Must be one or more of the H character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory (/) character. Defaults to HH.
  • minutePattern: Pattern for formatting the minute. Must be one or more of the m character. Optionally, wrap the pattern with characters that aren't alphanumeric or the directory (/) character. Defaults to mm.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to Avro Files on Cloud Storage template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • TOPIC_NAME: the Pub/Sub topic name
  • BUCKET_NAME: the name of your Cloud Storage bucket
  • FILENAME_PREFIX: the preferred output filename prefix
  • FILENAME_SUFFIX: the preferred output filename suffix
  • SHARD_TEMPLATE: the preferred output shard template

What's next