Created
August 27, 2021 05:16
-
-
Save ayush-poddar/dba28e298ed4227dc12f3e2d40d3e25c to your computer and use it in GitHub Desktop.
Items are retired indefinitely, even when using `RetryStrategy.RETRY_NEVER` it still keeps retrying failing records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python main.py --streaming \ | |
# --project=<PROJECT> \ | |
# --topic=projects/pubsub-public-data/topics/taxirides-realtime \ | |
# --runner=DataflowRunner \ | |
# --region=us-central1 \ | |
# --temp_location=<GCS_TEMP_LOCATION> \ | |
# --staging_location=<GCS_STAGING_LOCATION> \ | |
# --table1=<BQ_TABLE1> | |
# --table2=<BQ_TABLE2> | |
# Note: BQ dataset needs to be present, table can be automatically created | |
import json | |
import logging | |
import apache_beam as beam | |
from apache_beam.io.gcp.bigquery_tools import RetryStrategy | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.io.gcp.bigquery import BigQueryWriteFn | |
class CustomOptions(PipelineOptions): | |
@classmethod | |
def _add_argparse_args(cls, parser): # type: (_BeamArgumentParser) -> None | |
parser.add_argument("--topic", default="projects/pubsub-public-data/topics/taxirides-realtime") | |
parser.add_argument("--table1", help="BQ Table1 to write events to"), | |
parser.add_argument("--table2", help="BQ Table2 to write events to") | |
class PrintMessages(beam.DoFn): | |
def process(self, element, *args, **kwargs): | |
logging.info(len(element)) | |
yield element | |
class FilterMessages(beam.DoFn): | |
def process(self, element, *args, **kwargs): | |
record = {'large_field': 'field'*124415, 'passenger_count': int(element['passenger_count']), | |
'timestamp': element['timestamp'], 'meter_reading': int(element['meter_reading'])} | |
yield record | |
def run_pipeline(): | |
pipeline_options = CustomOptions().get_all_options(retain_unknown_options=True) | |
schema = {'fields': [ | |
{'name': 'large_field', 'type': 'STRING', 'mode': 'NULLABLE'}, | |
{'name': 'passenger_count', 'type': 'INTEGER', 'mode': 'NULLABLE'}, | |
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}, | |
{'name': 'meter_reading', 'type': 'FLOAT', 'mode': 'NULLABLE'}]} | |
p = beam.Pipeline(options=CustomOptions()) | |
filtered_messages = ( | |
p | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(topic=pipeline_options.get('topic')) | |
| "Convert to JSON" >> beam.Map(lambda s: json.loads(s)) | |
| "Print messages" >> beam.ParDo(PrintMessages()) | |
| "Filter fields" >> beam.ParDo(FilterMessages()) | |
) | |
write1 = filtered_messages | "Write to BQ1" >> beam.io.WriteToBigQuery( | |
pipeline_options.get('table1'), schema=schema, | |
create_disposition='CREATE_IF_NEEDED', ignore_insert_ids=True, | |
write_disposition='WRITE_APPEND', | |
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR, batch_size=1) | |
write2 = filtered_messages | "Write to BQ2" >> beam.io.WriteToBigQuery( | |
pipeline_options.get('table2'), schema=schema, | |
create_disposition='CREATE_IF_NEEDED', ignore_insert_ids=True, | |
write_disposition='WRITE_APPEND', | |
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR) | |
((write1[BigQueryWriteFn.FAILED_ROWS], write2[BigQueryWriteFn.FAILED_ROWS]) | |
| 'Combine errors' >> beam.Flatten() | |
| "Print failed messages" >> beam.ParDo(PrintMessages())) | |
p.run().wait_until_finish() | |
if __name__ == "__main__": | |
logging.getLogger().setLevel(logging.INFO) | |
run_pipeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment