Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created August 12, 2019 20:24
Show Gist options
  • Save gxercavins/c0753bff5e5883791f4c2de7e0689562 to your computer and use it in GitHub Desktop.
Save gxercavins/c0753bff5e5883791f4c2de7e0689562 to your computer and use it in GitHub Desktop.
SO question 55370068
import argparse, datetime, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class GetTimestampFn(beam.DoFn):
"""Prints element timestamp"""
def process(self, element, timestamp=beam.DoFn.TimestampParam):
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read Messages' >> beam.io.ReadFromPubSub(topic='projects/PROJECT/topics/TOPIC')
| 'Print Timestamps' >> beam.ParDo(GetTimestampFn()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment