Skip to content

Instantly share code, notes, and snippets.

@ayush-poddar
Created August 27, 2021 05:16
Show Gist options
  • Save ayush-poddar/dba28e298ed4227dc12f3e2d40d3e25c to your computer and use it in GitHub Desktop.
Save ayush-poddar/dba28e298ed4227dc12f3e2d40d3e25c to your computer and use it in GitHub Desktop.
Items are retired indefinitely, even when using `RetryStrategy.RETRY_NEVER` it still keeps retrying failing records
# python main.py --streaming \
# --project=<PROJECT> \
# --topic=projects/pubsub-public-data/topics/taxirides-realtime \
# --runner=DataflowRunner \
# --region=us-central1 \
# --temp_location=<GCS_TEMP_LOCATION> \
# --staging_location=<GCS_STAGING_LOCATION> \
# --table1=<BQ_TABLE1>
# --table2=<BQ_TABLE2>
# Note: BQ dataset needs to be present, table can be automatically created
import json
import logging
import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import BigQueryWriteFn
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser): # type: (_BeamArgumentParser) -> None
parser.add_argument("--topic", default="projects/pubsub-public-data/topics/taxirides-realtime")
parser.add_argument("--table1", help="BQ Table1 to write events to"),
parser.add_argument("--table2", help="BQ Table2 to write events to")
class PrintMessages(beam.DoFn):
def process(self, element, *args, **kwargs):
logging.info(len(element))
yield element
class FilterMessages(beam.DoFn):
def process(self, element, *args, **kwargs):
record = {'large_field': 'field'*124415, 'passenger_count': int(element['passenger_count']),
'timestamp': element['timestamp'], 'meter_reading': int(element['meter_reading'])}
yield record
def run_pipeline():
pipeline_options = CustomOptions().get_all_options(retain_unknown_options=True)
schema = {'fields': [
{'name': 'large_field', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'passenger_count', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
{'name': 'meter_reading', 'type': 'FLOAT', 'mode': 'NULLABLE'}]}
p = beam.Pipeline(options=CustomOptions())
filtered_messages = (
p | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=pipeline_options.get('topic'))
| "Convert to JSON" >> beam.Map(lambda s: json.loads(s))
| "Print messages" >> beam.ParDo(PrintMessages())
| "Filter fields" >> beam.ParDo(FilterMessages())
)
write1 = filtered_messages | "Write to BQ1" >> beam.io.WriteToBigQuery(
pipeline_options.get('table1'), schema=schema,
create_disposition='CREATE_IF_NEEDED', ignore_insert_ids=True,
write_disposition='WRITE_APPEND',
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR, batch_size=1)
write2 = filtered_messages | "Write to BQ2" >> beam.io.WriteToBigQuery(
pipeline_options.get('table2'), schema=schema,
create_disposition='CREATE_IF_NEEDED', ignore_insert_ids=True,
write_disposition='WRITE_APPEND',
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR)
((write1[BigQueryWriteFn.FAILED_ROWS], write2[BigQueryWriteFn.FAILED_ROWS])
| 'Combine errors' >> beam.Flatten()
| "Print failed messages" >> beam.ParDo(PrintMessages()))
p.run().wait_until_finish()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment