Skip to content

Instantly share code, notes, and snippets.

@nicogaspa
Last active March 26, 2021 21:50
Show Gist options
  • Save nicogaspa/498b07021a727b65ce4e824aec0fc034 to your computer and use it in GitHub Desktop.
Save nicogaspa/498b07021a727b65ce4e824aec0fc034 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import json
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# Service account key path
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials/e62e3e6ed7d7.json"
INPUT_SUBSCRIPTION = "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
BIGQUERY_TABLE = "PROJECT_ID:DATASET_NAME.TABLE_NAME"
BIGQUERY_SCHEMA = "timestamp:TIMESTAMP,attr1:FLOAT,msg:STRING"
class CustomParsing(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """
def to_runner_api_parameter(self, unused_context):
# Not very relevant, returns a URN (uniform resource name) and the payload
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
parsed = json.loads(element.decode("utf-8"))
parsed["timestamp"] = timestamp.to_rfc3339()
yield parsed
def run():
# Parsing arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
default=INPUT_SUBSCRIPTION,
)
parser.add_argument(
"--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
)
parser.add_argument(
"--output_schema",
help="Output BigQuery Schema in text format",
default=BIGQUERY_SCHEMA,
)
known_args, pipeline_args = parser.parse_known_args()
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None
)
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment