Created
August 11, 2021 21:00
-
-
Save onefoursix/1b911a1ca297056623a46d685b216cef to your computer and use it in GitHub Desktop.
Jython example for SDC Scripting Origin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Example Scripting Origin to call a REST API with pagination and offset handling | |
# Imports | |
try: | |
sdc.importLock() | |
import sys | |
## Set this path to where we can load the Requests module | |
sys.path.append('/usr/local/lib/python3.9/site-packages') | |
import datetime | |
import json | |
import requests | |
from requests.auth import HTTPBasicAuth | |
import time | |
finally: | |
sdc.importUnlock() | |
# Set DEBUG to True to enable logging | |
# Use '$ tail -f sdc.log | grep SCRIPTING_ORIGIN' to filter the log messages | |
DEBUG = True | |
# Request headers | |
headers = {'Content-Type':'application/json'} | |
# Create a session | |
session = requests.session() | |
session.headers = headers | |
# The List field in the response that contains records. | |
# Set response_list_field = '' if the entire response is the list of records | |
response_list_field = 'data' | |
# The entity_name is the key for the offset | |
entity_name = 'page' | |
# Get previously committed offset or use the pipeline parameter "INITIAL_OFFSET" | |
# if there is no saved offset or if Reset Origin is used | |
if sdc.lastOffsets.containsKey(entity_name): | |
offset = int(sdc.lastOffsets.get(entity_name)) | |
# Bump the saved offset by 1 to start reading from the next value | |
offset += 1 | |
else: | |
offset = ${INITIAL_OFFSET} | |
# This example uses 'page=[value]' but could easily be changed to something | |
# like 'my_custom_field=[value]' with the value read from the last record in the response | |
initial_url_args = 'page={}'.format(offset) | |
# set to True for the initial REST API call | |
initial_call = True | |
# Create a batch | |
cur_batch = sdc.createBatch() | |
# We'll set "done" to True when we reach the last page of the response or if we get an error | |
done = False | |
# index each record | |
record_index = 0 | |
while not done: | |
try: | |
# Construct the URL to be called | |
if initial_call: | |
current_url = '{}&{}'.format('${BASE_REST_API_URL}', initial_url_args) | |
initial_call = False | |
else: | |
current_url = next_page_url | |
# Call the REST API | |
if DEBUG: | |
sdc.log.info("SCRIPTING_ORIGIN URL: {}".format(current_url)) | |
response = session.get(current_url) | |
# Sleep between calls to the API so we don't get rate-limited | |
time.sleep(${SLEEP_SECONDS}) | |
# Get the response data | |
if response_list_field == '': | |
response_data = response.json() | |
else: | |
response_data = response.json()[response_list_field] | |
# Exit if there is no data | |
if response_data is None or len(response_data) == 0: | |
done = True | |
else: | |
# Get the number of records in the response so | |
# we can tell when we're processing the last message | |
num_records_in_response = len(response_data) | |
response_record_index = 0 | |
# Handle each record in the response | |
for response_record in response_data: | |
# Bump the index | |
response_record_index += 1 | |
# Create a record for each result record | |
record = sdc.createRecord('record created ' + str(datetime.datetime.now()) + str(record_index)) | |
record_index += 1 | |
# Assign the item to the record | |
record.value = {} | |
record.value['data'] = response_record | |
# See if we are on the last record of the response | |
# Use this section to capture a last read value of a field | |
# for example, in order to construct the next_page_url below | |
# rather than using a page number. | |
# For example, if we are on the last record: | |
if response_record_index == len(response_data): | |
last_id_in_response = response_record['_id'] | |
# Add the record to the batch | |
cur_batch.add(record) | |
# if the batch is full, process it and start a new one | |
if cur_batch.size() >= sdc.batchSize: | |
# blocks until all records are written to all destinations | |
# and updates offset in accordance with delivery guarantee | |
cur_batch.process(entity_name, str(offset)) | |
cur_batch = sdc.createBatch() | |
# Bump the offset. | |
# As this example uses page numbers we just add 1, but you | |
# could instead assign a value captured from the last record | |
# of the batch above | |
offset += 1 | |
# Create the next page URL | |
# This example uses 'page=value' but you could use | |
# 'my_custom_field=[value]' with the value read from the last record in the response | |
next_page_url = '{}&page={}'.format('${BASE_REST_API_URL}', offset) | |
# Gracefully end the script if the pipeline has been stopped. | |
if sdc.isStopped(): | |
done = True | |
except Exception as e: | |
cur_batch.addError(record, str(e)) | |
cur_batch.process(entity_name, str(offset)) | |
done = True | |
if cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0: | |
cur_batch.process(entity_name, str(offset)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment