Created
March 29, 2019 19:38
-
-
Save sunahsuh/e7c76fd91f426ab286e62597a5581299 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import apache_beam as beam | |
import apache_beam.transforms.window as window | |
from apache_beam.transforms.core import Filter | |
from apache_beam.examples.wordcount import WordExtractingDoFn | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.options.pipeline_options import SetupOptions | |
from apache_beam.options.pipeline_options import StandardOptions | |
from apache_beam.io.gcp.bigquery import BigQueryDisposition | |
from base64 import b64decode | |
from binascii import b2a_base64 | |
from zlib import decompress, MAX_WBITS | |
import json | |
def parse_data(data): | |
return decompress(b64decode(b2a_base64(data)), MAX_WBITS | 32) | |
def safe_index(arr, idx): | |
try: | |
return arr[idx] | |
except IndexError: | |
None | |
class FlatMapEventsFn(beam.DoFn): | |
def process(self, message): | |
ping = json.loads(parse_data(message.data)) | |
final_events = [] | |
for process, events in ping["payload"]["events"].iteritems(): | |
for event in events: | |
as_dict = { | |
"doc_type": message.attributes["document_type"], | |
"client_id": ping["clientId"], | |
"normalized_channel": message.attributes["app_update_channel"], | |
"country": message.attributes["geo_country"], | |
"locale": ping["environment"]["settings"]["locale"], | |
"app_name": message.attributes["app_name"], | |
"app_version": message.attributes["app_version"], | |
"os": ping["environment"]["system"]["os"]["name"], | |
"os_version": ping["environment"]["system"]["os"]["version"], | |
"sample_id": 1000, | |
"event_string_value": safe_index(event, 4), | |
"event_object": safe_index(event, 3), | |
"event_method": safe_index(event, 2), | |
"event_map_values": safe_index(event, 5) | |
} | |
final_events.append({"timestamp": message.attributes['submission_timestamp'], | |
"payload": json.dumps(as_dict), | |
"run_id": 3}) | |
return final_events | |
def run(argv=None): | |
"""Build and run the pipeline.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--output_endpoint', required=True, | |
help=('Output http endpoint')) | |
parser.add_argument( | |
'--input_topic', | |
help=('Input PubSub topic of the form ' | |
'"projects/<PROJECT>/topics/<TOPIC>."')) | |
parser.add_argument( | |
'--output_table', | |
help=('Destination bigquery table')) | |
parser.add_argument( | |
'--output_dataset', | |
help=('Dataset containing the destination bigquery table')) | |
parser.add_argument( | |
'--output_project', | |
help=('Project containing the destination bigquery table')) | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
# We use the save_main_session option because one or more DoFn's in this | |
# workflow rely on global context (e.g., a module imported at module level). | |
pipeline_options = PipelineOptions(pipeline_args) | |
pipeline_options.view_as(SetupOptions).save_main_session = True | |
pipeline_options.view_as(StandardOptions).streaming = True | |
p = beam.Pipeline(options=pipeline_options) | |
# Read from PubSub into a PCollection. | |
messages = (p | |
| beam.io.ReadFromPubSub( | |
topic=known_args.input_topic, | |
with_attributes=True, | |
timestamp_attribute="submission_timestamp" | |
)) | |
output = (messages | |
| 'filter_event_pings' >> Filter(lambda m: m.attributes['document_type'] == "event") | |
| 'message_to_dict' >> beam.ParDo(FlatMapEventsFn())) | |
output | beam.io.WriteToBigQuery(known_args.output_table, dataset=known_args.output_dataset, | |
project=known_args.output_project, | |
create_disposition=BigQueryDisposition.CREATE_NEVER) | |
result = p.run() | |
result.wait_until_finish() | |
if __name__ == '__main__': | |
logging.getLogger().setLevel(logging.INFO) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment