Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Created March 10, 2021 00:24
Show Gist options
  • Save onefoursix/0a30bba03286e1f246e249421431284a to your computer and use it in GitHub Desktop.
Save onefoursix/0a30bba03286e1f246e249421431284a to your computer and use it in GitHub Desktop.
try:
sdc.importLock()
from datetime import datetime
finally:
sdc.importUnlock()
date_time_format = '%Y-%m-%d %H:%M:%S'
## Offsets will be stored with the key 'offsetKey' as a String with a value like this:
## startDate=1609459200&endDate=1609462800
## The startDate and endData values are stored as epoch seconds
# The top level offset key
offsetKey = 'startDate_and_endDate'
# startDate and endDate offset child keys
startDateOffsetKey = 'startDate'
endDateOffsetKey = 'endDate'
# Default values for startDate and EndDate if no offsets exist
defaultStartDateSeconds = '1609459200' # January 1, 2021 12:00:00 AM GMT
defaultEndDateSeconds = '1609462800' # January 1, 2021 1:00:00 AM GMT
# get previously committed offsets or use the default values
if sdc.lastOffsets.containsKey(offsetKey):
offset = sdc.lastOffsets.get(offsetKey)
else:
offset = 'startDate={}&endDate={}'.format(defaultStartDateSeconds, defaultEndDateSeconds)
start_date_seconds = long(offset.split('&')[0].split('=')[1])
end_date_seconds = long(offset.split('&')[1].split('=')[1])
cur_batch = sdc.createBatch()
record = sdc.createRecord('record created ' + str(datetime.now()))
hasNext = True
while hasNext:
try:
# Create a record
record = sdc.createRecord('record created ' + str(datetime.now()))
record.value = {}
record.value['start_date'] = datetime.fromtimestamp(start_date_seconds).strftime(date_time_format)
record.value['end_date'] = datetime.fromtimestamp(end_date_seconds).strftime(date_time_format)
cur_batch.add(record)
# Simulate progress by bumping the startDate and endDate offsets by 1 hour
start_date_seconds += 60 * 60
end_date_seconds += 60 * 60
offset = 'startDate={}&endDate={}'.format(start_date_seconds, end_date_seconds)
# if the batch is full, process it and start a new one
if cur_batch.size() >= sdc.batchSize:
# blocks until all records are written to all destinations
# (or failure) and updates offset
# in accordance with delivery guarantee
cur_batch.process(offsetKey, str(offset))
cur_batch = sdc.createBatch()
# if the pipeline has been stopped, we should end the script
if sdc.isStopped():
hasNext = False
except Exception as e:
cur_batch.addError(record, str(e))
cur_batch.process(offsetKey, str(offset))
hasNext = False
if cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0:
cur_batch.process(offsetKey, str(offset))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment