Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Created August 11, 2021 21:00
Show Gist options
  • Save onefoursix/1b911a1ca297056623a46d685b216cef to your computer and use it in GitHub Desktop.
Save onefoursix/1b911a1ca297056623a46d685b216cef to your computer and use it in GitHub Desktop.
Jython example for SDC Scripting Origin
## Example Scripting Origin to call a REST API with pagination and offset handling
# Imports
try:
sdc.importLock()
import sys
## Set this path to where we can load the Requests module
sys.path.append('/usr/local/lib/python3.9/site-packages')
import datetime
import json
import requests
from requests.auth import HTTPBasicAuth
import time
finally:
sdc.importUnlock()
# Set DEBUG to True to enable logging
# Use '$ tail -f sdc.log | grep SCRIPTING_ORIGIN' to filter the log messages
DEBUG = True
# Request headers
headers = {'Content-Type':'application/json'}
# Create a session
session = requests.session()
session.headers = headers
# The List field in the response that contains records.
# Set response_list_field = '' if the entire response is the list of records
response_list_field = 'data'
# The entity_name is the key for the offset
entity_name = 'page'
# Get previously committed offset or use the pipeline parameter "INITIAL_OFFSET"
# if there is no saved offset or if Reset Origin is used
if sdc.lastOffsets.containsKey(entity_name):
offset = int(sdc.lastOffsets.get(entity_name))
# Bump the saved offset by 1 to start reading from the next value
offset += 1
else:
offset = ${INITIAL_OFFSET}
# This example uses 'page=[value]' but could easily be changed to something
# like 'my_custom_field=[value]' with the value read from the last record in the response
initial_url_args = 'page={}'.format(offset)
# set to True for the initial REST API call
initial_call = True
# Create a batch
cur_batch = sdc.createBatch()
# We'll set "done" to True when we reach the last page of the response or if we get an error
done = False
# index each record
record_index = 0
while not done:
try:
# Construct the URL to be called
if initial_call:
current_url = '{}&{}'.format('${BASE_REST_API_URL}', initial_url_args)
initial_call = False
else:
current_url = next_page_url
# Call the REST API
if DEBUG:
sdc.log.info("SCRIPTING_ORIGIN URL: {}".format(current_url))
response = session.get(current_url)
# Sleep between calls to the API so we don't get rate-limited
time.sleep(${SLEEP_SECONDS})
# Get the response data
if response_list_field == '':
response_data = response.json()
else:
response_data = response.json()[response_list_field]
# Exit if there is no data
if response_data is None or len(response_data) == 0:
done = True
else:
# Get the number of records in the response so
# we can tell when we're processing the last message
num_records_in_response = len(response_data)
response_record_index = 0
# Handle each record in the response
for response_record in response_data:
# Bump the index
response_record_index += 1
# Create a record for each result record
record = sdc.createRecord('record created ' + str(datetime.datetime.now()) + str(record_index))
record_index += 1
# Assign the item to the record
record.value = {}
record.value['data'] = response_record
# See if we are on the last record of the response
# Use this section to capture a last read value of a field
# for example, in order to construct the next_page_url below
# rather than using a page number.
# For example, if we are on the last record:
if response_record_index == len(response_data):
last_id_in_response = response_record['_id']
# Add the record to the batch
cur_batch.add(record)
# if the batch is full, process it and start a new one
if cur_batch.size() >= sdc.batchSize:
# blocks until all records are written to all destinations
# and updates offset in accordance with delivery guarantee
cur_batch.process(entity_name, str(offset))
cur_batch = sdc.createBatch()
# Bump the offset.
# As this example uses page numbers we just add 1, but you
# could instead assign a value captured from the last record
# of the batch above
offset += 1
# Create the next page URL
# This example uses 'page=value' but you could use
# 'my_custom_field=[value]' with the value read from the last record in the response
next_page_url = '{}&page={}'.format('${BASE_REST_API_URL}', offset)
# Gracefully end the script if the pipeline has been stopped.
if sdc.isStopped():
done = True
except Exception as e:
cur_batch.addError(record, str(e))
cur_batch.process(entity_name, str(offset))
done = True
if cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0:
cur_batch.process(entity_name, str(offset))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment