Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active June 28, 2021 14:53
Show Gist options
  • Save onefoursix/3a5777502085bf32c3ed61a2e85d5ea3 to your computer and use it in GitHub Desktop.
Save onefoursix/3a5777502085bf32c3ed61a2e85d5ea3 to your computer and use it in GitHub Desktop.
import json
import time
# Conver the pipeline parameter JOB_STATUS_CHANGE_SUPPRESS_DUPES_MINUTES from minutes to seconds
SUPPRESS_DUPE_NOTIFICATIONS_SECONDS = ${JOB_STATUS_CHANGE_SUPPRESS_DUPES_MINUTES} * 60
for record in sdc.records:
try:
# Assume we will forward this notification to the endpoint targets
send_notification = 'true'
# Get the current time in seconds
curr_time_seconds = int(round(time.time()))
# Get the notification payload
payload = record.value['notification_payload']
# Create a unique key for events of a specifc type for a specific Job ID
unique_event_key = payload['JOB_ID'] + payload['FROM_COLOR'] + payload['FROM_STATUS'] + payload['TO_COLOR'] + payload['TO_STATUS']
# If we have never seen this event before, add the event key to the cache with a timestamp
if not unique_event_key in sdc.state['cache'].keys():
sdc.state['cache'][unique_event_key] = curr_time_seconds
# If we have seen this event before
else:
# Get the timestamp of when we saw this event from the cache
cached_notification_time_for_event = sdc.state['cache'][unique_event_key]
# If we have already seen this same event within the dupe window
if cached_notification_time_for_event + SUPPRESS_DUPE_NOTIFICATIONS_SECONDS > curr_time_seconds:
# Suppress this event
send_notification = 'false'
else:
# Update the timestamp for this event in the cache
sdc.state['cache'][unique_event_key] = curr_time_seconds
# Format the notification from the template
notification = sdc.state['notification_template'].format(
payload['JOB_NAME'],
payload['JOB_ID'],
payload['TRIGGERED_ON'],
payload['FROM_COLOR'],
payload['FROM_STATUS'],
payload['TO_COLOR'],
payload['TO_STATUS']
)
# Append the Error Message if it exists
if 'ERROR_MESSAGE' in payload.keys() and len(payload['ERROR_MESSAGE']) > 0:
notification += " Error Message: {}".format(payload['ERROR_MESSAGE'])
# Clear the record
record.value = {}
# Set the notification text
record.value['text'] = notification
# Set the send_notification attribute
record.attributes['send_notification'] = send_notification
# Write the record
sdc.output.write(record)
except Exception as e:
sdc.error.write(record, str(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment