Created
March 10, 2021 00:24
-
-
Save onefoursix/0a30bba03286e1f246e249421431284a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
sdc.importLock() | |
from datetime import datetime | |
finally: | |
sdc.importUnlock() | |
date_time_format = '%Y-%m-%d %H:%M:%S' | |
## Offsets will be stored with the key 'offsetKey' as a String with a value like this: | |
## startDate=1609459200&endDate=1609462800 | |
## The startDate and endData values are stored as epoch seconds | |
# The top level offset key | |
offsetKey = 'startDate_and_endDate' | |
# startDate and endDate offset child keys | |
startDateOffsetKey = 'startDate' | |
endDateOffsetKey = 'endDate' | |
# Default values for startDate and EndDate if no offsets exist | |
defaultStartDateSeconds = '1609459200' # January 1, 2021 12:00:00 AM GMT | |
defaultEndDateSeconds = '1609462800' # January 1, 2021 1:00:00 AM GMT | |
# get previously committed offsets or use the default values | |
if sdc.lastOffsets.containsKey(offsetKey): | |
offset = sdc.lastOffsets.get(offsetKey) | |
else: | |
offset = 'startDate={}&endDate={}'.format(defaultStartDateSeconds, defaultEndDateSeconds) | |
start_date_seconds = long(offset.split('&')[0].split('=')[1]) | |
end_date_seconds = long(offset.split('&')[1].split('=')[1]) | |
cur_batch = sdc.createBatch() | |
record = sdc.createRecord('record created ' + str(datetime.now())) | |
hasNext = True | |
while hasNext: | |
try: | |
# Create a record | |
record = sdc.createRecord('record created ' + str(datetime.now())) | |
record.value = {} | |
record.value['start_date'] = datetime.fromtimestamp(start_date_seconds).strftime(date_time_format) | |
record.value['end_date'] = datetime.fromtimestamp(end_date_seconds).strftime(date_time_format) | |
cur_batch.add(record) | |
# Simulate progress by bumping the startDate and endDate offsets by 1 hour | |
start_date_seconds += 60 * 60 | |
end_date_seconds += 60 * 60 | |
offset = 'startDate={}&endDate={}'.format(start_date_seconds, end_date_seconds) | |
# if the batch is full, process it and start a new one | |
if cur_batch.size() >= sdc.batchSize: | |
# blocks until all records are written to all destinations | |
# (or failure) and updates offset | |
# in accordance with delivery guarantee | |
cur_batch.process(offsetKey, str(offset)) | |
cur_batch = sdc.createBatch() | |
# if the pipeline has been stopped, we should end the script | |
if sdc.isStopped(): | |
hasNext = False | |
except Exception as e: | |
cur_batch.addError(record, str(e)) | |
cur_batch.process(offsetKey, str(offset)) | |
hasNext = False | |
if cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0: | |
cur_batch.process(offsetKey, str(offset)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment