Skip to content

Instantly share code, notes, and snippets.

@awoods
Created November 22, 2024 01:46
Show Gist options
  • Save awoods/e3a2575c21e18a1800ae5bea675dcf74 to your computer and use it in GitHub Desktop.
Save awoods/e3a2575c21e18a1800ae5bea675dcf74 to your computer and use it in GitHub Desktop.
Code Sample
@app.task(serializer='json',
name='jstor_aspace_aggregator.tasks.aggregate', queue=queue)
def aggregate(json_message): # pragma: no cover
logger.info("message")
logger.info(json_message)
new_message = json_message # {"hello": "jstor_aspace_tranformer"}
# If the message is a test message, set the test flag to True
s3_util = None
mongo_util = None
if "unit_test" in json_message:
s3_util = MockS3Util()
mongo_util = MockMongoUtil()
else:
s3_util = S3Util()
mongo_util = MongoUtil()
worker = Worker(s3_util, mongo_util)
# configure the worker for test settings if needed
if (("local_integration_test" in json_message) or
("integration_test" in json_message)):
# worker.configure_for_local_integration_test()
worker.AGGREGATION_SETS = os.getenv('AGGREGATION_TEST_SETS')
record_count, file_count = worker.aggregate(json_message)
logger.debug(f"agg1: record_count: {record_count}, " +
f"file_count: {file_count}")
work_count, item_count = worker.create_container_stubs(json_message)
logger.debug(f"agg2: work_count: {work_count}, item_count: {item_count}")
# retry failed containers
set_scope = "all sets"
if ("harvestset" in json_message):
set_scope = f"set {json_message['harvestset']}"
logger.debug(f"retrying bad containers for {set_scope}...")
retried_files, fixed_records, failed_records, deleted_errors = \
worker.retry_aggregate(json_message)
logger.debug(f"retry bad containers: retried files: {retried_files}, " +
f"fixed aggregate records: {fixed_records}, " +
f"failed records {failed_records}, " +
f"errors fixed: {deleted_errors}")
# If only unit testing, return the message and
# do not trigger the next task.
if "unit_test" in json_message:
return new_message
if "local_integration_test" in json_message:
return new_message
if (("aggregation_only" in json_message)
and ("integration_test" in json_message)):
app.send_task("tasks.tasks.do_aggregation_task",
args=[new_message], kwargs={},
queue=integration_test_queue)
return new_message
# pragma: no cover, unit tests end before this call # noqa: E501
app.send_task("tasks.tasks.do_task",
args=[new_message], kwargs={},
queue=os.getenv('NEXT_QUEUE_NAME')) # pragma: no cover, unit tests should not progress the message # noqa: E501
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment