Last active
June 28, 2021 14:53
-
-
Save onefoursix/3a5777502085bf32c3ed61a2e85d5ea3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
# Conver the pipeline parameter JOB_STATUS_CHANGE_SUPPRESS_DUPES_MINUTES from minutes to seconds | |
SUPPRESS_DUPE_NOTIFICATIONS_SECONDS = ${JOB_STATUS_CHANGE_SUPPRESS_DUPES_MINUTES} * 60 | |
for record in sdc.records: | |
try: | |
# Assume we will forward this notification to the endpoint targets | |
send_notification = 'true' | |
# Get the current time in seconds | |
curr_time_seconds = int(round(time.time())) | |
# Get the notification payload | |
payload = record.value['notification_payload'] | |
# Create a unique key for events of a specifc type for a specific Job ID | |
unique_event_key = payload['JOB_ID'] + payload['FROM_COLOR'] + payload['FROM_STATUS'] + payload['TO_COLOR'] + payload['TO_STATUS'] | |
# If we have never seen this event before, add the event key to the cache with a timestamp | |
if not unique_event_key in sdc.state['cache'].keys(): | |
sdc.state['cache'][unique_event_key] = curr_time_seconds | |
# If we have seen this event before | |
else: | |
# Get the timestamp of when we saw this event from the cache | |
cached_notification_time_for_event = sdc.state['cache'][unique_event_key] | |
# If we have already seen this same event within the dupe window | |
if cached_notification_time_for_event + SUPPRESS_DUPE_NOTIFICATIONS_SECONDS > curr_time_seconds: | |
# Suppress this event | |
send_notification = 'false' | |
else: | |
# Update the timestamp for this event in the cache | |
sdc.state['cache'][unique_event_key] = curr_time_seconds | |
# Format the notification from the template | |
notification = sdc.state['notification_template'].format( | |
payload['JOB_NAME'], | |
payload['JOB_ID'], | |
payload['TRIGGERED_ON'], | |
payload['FROM_COLOR'], | |
payload['FROM_STATUS'], | |
payload['TO_COLOR'], | |
payload['TO_STATUS'] | |
) | |
# Append the Error Message if it exists | |
if 'ERROR_MESSAGE' in payload.keys() and len(payload['ERROR_MESSAGE']) > 0: | |
notification += " Error Message: {}".format(payload['ERROR_MESSAGE']) | |
# Clear the record | |
record.value = {} | |
# Set the notification text | |
record.value['text'] = notification | |
# Set the send_notification attribute | |
record.attributes['send_notification'] = send_notification | |
# Write the record | |
sdc.output.write(record) | |
except Exception as e: | |
sdc.error.write(record, str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment