Skip to content

Instantly share code, notes, and snippets.

@sunahsuh
Created March 29, 2019 19:38
Show Gist options
  • Save sunahsuh/e7c76fd91f426ab286e62597a5581299 to your computer and use it in GitHub Desktop.
Save sunahsuh/e7c76fd91f426ab286e62597a5581299 to your computer and use it in GitHub Desktop.
import argparse
import logging
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.transforms.core import Filter
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from base64 import b64decode
from binascii import b2a_base64
from zlib import decompress, MAX_WBITS
import json
def parse_data(data):
return decompress(b64decode(b2a_base64(data)), MAX_WBITS | 32)
def safe_index(arr, idx):
try:
return arr[idx]
except IndexError:
None
class FlatMapEventsFn(beam.DoFn):
def process(self, message):
ping = json.loads(parse_data(message.data))
final_events = []
for process, events in ping["payload"]["events"].iteritems():
for event in events:
as_dict = {
"doc_type": message.attributes["document_type"],
"client_id": ping["clientId"],
"normalized_channel": message.attributes["app_update_channel"],
"country": message.attributes["geo_country"],
"locale": ping["environment"]["settings"]["locale"],
"app_name": message.attributes["app_name"],
"app_version": message.attributes["app_version"],
"os": ping["environment"]["system"]["os"]["name"],
"os_version": ping["environment"]["system"]["os"]["version"],
"sample_id": 1000,
"event_string_value": safe_index(event, 4),
"event_object": safe_index(event, 3),
"event_method": safe_index(event, 2),
"event_map_values": safe_index(event, 5)
}
final_events.append({"timestamp": message.attributes['submission_timestamp'],
"payload": json.dumps(as_dict),
"run_id": 3})
return final_events
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_endpoint', required=True,
help=('Output http endpoint'))
parser.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>."'))
parser.add_argument(
'--output_table',
help=('Destination bigquery table'))
parser.add_argument(
'--output_dataset',
help=('Dataset containing the destination bigquery table'))
parser.add_argument(
'--output_project',
help=('Project containing the destination bigquery table'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
# Read from PubSub into a PCollection.
messages = (p
| beam.io.ReadFromPubSub(
topic=known_args.input_topic,
with_attributes=True,
timestamp_attribute="submission_timestamp"
))
output = (messages
| 'filter_event_pings' >> Filter(lambda m: m.attributes['document_type'] == "event")
| 'message_to_dict' >> beam.ParDo(FlatMapEventsFn()))
output | beam.io.WriteToBigQuery(known_args.output_table, dataset=known_args.output_dataset,
project=known_args.output_project,
create_disposition=BigQueryDisposition.CREATE_NEVER)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment