Created
April 29, 2016 18:07
-
-
Save r39132/0d537711b5b9c554c51e854775a15855 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models import Variable | |
from boto.s3.key import Key | |
from collections import OrderedDict | |
from datetime import date, datetime, time, timedelta | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from slackclient import SlackClient | |
from subprocess import Popen, PIPE, STDOUT | |
import base64 | |
import boto | |
import boto.s3 | |
import boto.sqs | |
import boto.sns | |
import itertools | |
import json | |
import logging | |
import logging.handlers | |
import os | |
import pprint | |
import psycopg2 | |
import random | |
import re | |
import requests | |
import shutil | |
import smtplib | |
import sys | |
import tempfile | |
import time | |
import urllib | |
import uuid | |
# Set up logging | |
logger = logging.getLogger('ep_telemetry_pipeline_utils') | |
logger.setLevel(logging.DEBUG) | |
handler = logging.handlers.SysLogHandler(address='/dev/log') | |
logger.addHandler(handler) | |
# Get the environment - Either EP_STAGE or EP_PROD | |
ENV = Variable.get("ENV") | |
if not ENV: | |
ENV = 'Local Machine' | |
def str_to_bool(s): | |
return s in ['True', 'true'] | |
def _is_prod(): | |
logger.info("----- ENV : {}".format(ENV)) | |
if ENV == "EP_STAGE": | |
logger.info("----- ENV: is NOT PROD ") | |
return False | |
else: | |
logger.info("----- ENV: is PROD ") | |
return True | |
# LOAD VARIABLES | |
import_sqs_queue_name=Variable.get('ep_pipeline_import_sqs_queue_name').strip() | |
import_ep_pipeline_alerting_queue_name=Variable.get('ep_pipeline_alerting_queue_name').strip() | |
region = Variable.get('region').strip() | |
victorops_key=Variable.get('victorops_key').strip() | |
ep_ops_slack_channel_name=Variable.get('ep_ops_slack_channel_name').strip() | |
slack_api_token=Variable.get('slack_api_token').strip() | |
import_sns_topic_arn_for_job_status=Variable.get('ep_pipeline_sns_topic_for_job_status').strip() | |
import_ep_db_connect_string =Variable.get('ep_pipeline_db_connect_string').strip() | |
import_ep_pipeline_alert_email_dl = Variable.get("ep_pipeline_alert_email_dl").strip() | |
import_ep_pipeline_success_email_dl = Variable.get("ep_pipeline_success_email_dl").strip() | |
import_ep_pipeline_success_email_from = Variable.get("email_from").strip() | |
import_ep_pipeline_mandrill_creds_dict = json.loads(Variable.get("ep_pipeline_mandrill_creds").strip()) | |
import_spark_cluster_ip=Variable.get('ep_pipeline_spark_cluster_ip').strip() | |
import_ep_pipeline_model_build_spark_cluster_ip=Variable.get('ep_pipeline_model_build_spark_cluster_ip').strip() | |
import_airflow_agg_bucket_name=Variable.get('ep_pipeline_agg_bucket_name').strip() | |
import_airflow_enable_notifications=str_to_bool(Variable.get('ep_pipeline_enable_notifications').strip()) | |
import_airflow_priority_weight=int(Variable.get('ep_pipeline_priority_weight').strip()) | |
import_airflow_importer_failures_bucket_name=Variable.get('ep_pipeline_importer_failures_bucket_name').strip() | |
import_airflow_importer_metadata_bucket_name=Variable.get('ep_pipeline_importer_metadata_bucket_name').strip() | |
import_airflow_s3_collector_ingest_bucket_name=Variable.get('ep_pipeline_s3_collector_ingest_bucket_name').strip() | |
import_airflow_granularity_secs = int(Variable.get('ep_pipeline_granularity_secs').strip()) | |
import_airflow_collector_ingest_delay_secs = int(Variable.get('ep_pipeline_collector_ingest_delay_secs').strip()) | |
import_ep_pipeline_model_build_in_hourly = str_to_bool(Variable.get('ep_pipeline_model_build_in_hourly').strip()) | |
import_ep_terminate_emr_cluster = str_to_bool(Variable.get('ep_terminate_emr_cluster').strip()) | |
import_ep_pipeline_victorops_alerting = str_to_bool(Variable.get('ep_pipeline_victorops_alerting').strip()) | |
import_ep_pipeline_aggregation_timeout = float(Variable.get('ep_pipeline_aggregation_timeout').strip()) | |
import_airflow_start_date_as_lookback_days=Variable.get('ep_pipeline_start_date_as_lookback_days').strip() | |
import_aggregate_lookback_days=Variable.get('ep_pipeline_aggregate_lookback_days').strip() | |
healthchecks_io_url=Variable.get('healthchecks_io_url').strip() | |
CLEAR_LOGS = str_to_bool(Variable.get('ep_pipeline_delete_spark_logs').strip()) | |
PLATFORM = Variable.get('ep_platform_telemetry_pipeline').strip().lower() | |
PLATFORM_VARS = Variable.get('ep_spark_ssh_config', deserialize_json=True)[PLATFORM] | |
SSH_KEY = Variable.get('ep_platform_ssh_keys', deserialize_json=True)[PLATFORM] | |
SLAVES = [ slave.strip() for slave in PLATFORM_VARS['slaves'] ] | |
import_ep_pipeline_discrepancy_alerting_config = Variable.get("ep_pipeline_discrepancy_alerting_config", deserialize_json=True) | |
import_ep_ops_slack_alerting_enabled = str_to_bool(Variable.get("ep_ops_slack_alerting_enabled").strip()) | |
import_ep_pipeline_vacuum_analyze_scheduled_hour=int(Variable.get('ep_pipeline_vacuum_analyze_scheduled_hour').strip()) | |
pp = pprint.PrettyPrinter(indent=4) | |
# DEFINE CONSTANTS | |
ISO_8601_DATE_FORMAT = '%Y-%m-%d' | |
ISO_8601_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" | |
MAX_NUM_OF_DLQ_MESSAGES_TO_DISPLAY = 5 | |
MODEL_BUILDING_STALENESS_CRITERIA_SECONDS = 24 * 3600 | |
AIRFLOW_INPUT_DATE_FORMAT = ISO_8601_DATE_TIME_FORMAT | |
#AIRFLOW_INPUT_DATE_FORMAT = '%Y-%m-%d' | |
COLLECTOR_VALIDATE_DATE_FORMAT = '%Y%m%d_%H' | |
SPARK_DATE_FORMAT = '%Y%m%d_%H0000' | |
_MANDRILL_API_ENDPOINT = 'smtp.mandrillapp.com' | |
_SLEEP_INTERVAL_SECONDS = 10 | |
_MAX_WAIT_TIME_SECONDS_FOR_NEW_SQS_MESSAGE = _SLEEP_INTERVAL_SECONDS * 30 | |
VACUUM_ANALYZE_RIA_QUERY= '''VACUUM ANALYZE VERBOSE receiver_ip_aggregate;''' | |
VACUUM_ANALYZE_RDA_QUERY= '''VACUUM ANALYZE VERBOSE receiver_domain_aggregate;''' | |
VACUUM_ANALYZE_MSG_QUERY= '''VACUUM ANALYZE VERBOSE message;''' | |
VACUUM_ANALYZE_MSG_AGG_QUERY= '''VACUUM ANALYZE VERBOSE message_aggregate;''' | |
VACUUM_ANALYZE_DISC_METRICS_QUERY= '''VACUUM ANALYZE VERBOSE telemetry_data_discrepancy;''' | |
DELETE_RIA_QUERY = '''delete from receiver_ip_aggregate ria where ts >= '%s' and ts < '%s';''' | |
DELETE_M_QUERY = '''delete from message where ts >= '%s' and ts < '%s';''' | |
DELETE_RDA_QUERY = '''delete from receiver_domain_aggregate where ts >= '%s' and ts < '%s';''' | |
DELETE_MESSAGE_AGG_QUERY = '''DELETE FROM message_aggregate WHERE ts >= '%s' AND ts < '%s';''' | |
DELETE_TELEMETRY_DATA_DISC_QUERY = '''DELETE FROM telemetry_data_discrepancy WHERE execution_date >= '%s' AND execution_date < '%s';''' | |
_MESSAGE_COUNT_QUERY="SELECT count(*) from message where ts >= '%s' and ts < '%s';" | |
ALL_ORGS_QUERY = '''SELECT id from organization WHERE not deleted AND ingest_enabled AND (expires_at IS NULL OR expires_at > now());''' | |
GET_ORGS_INGEST_QUERY = '''SELECT id, ingest_enabled from organization WHERE not deleted AND (expires_at IS NULL OR expires_at > now());''' | |
# RDA/Message by Org and count | |
_MESSAGE_ORG_HISTO_QUERY='''SELECT organization_id "org", count(*) from message where ts >= '%s' and ts < '%s' group by org order by org;''' | |
_NRT_MESSAGE_ORG_HISTO_QUERY="SELECT organization_id \"org\", count(*) FROM nrt_message WHERE ts >= '%s' AND ts < '%s' GROUP BY org ORDER BY org" | |
_RDA_ORG_HISTO_QUERY='''SELECT organization_id "org", count(*) from receiver_domain_aggregate where ts >= '%s' and ts < '%s' group by org order by org;''' | |
# RDA/Message by day and count | |
_MESSAGE_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) from message where ts >= '%s' and ts < '%s' group by day order by day desc;''' | |
_NRT_MESSAGE_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) FROM nrt_message WHERE ts > '%s' AND ts < '%s' GROUP BY day ORDER BY day DESC''' | |
_RDA_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) from receiver_domain_aggregate where ts >= '%s' and ts < '%s' group by day order by day desc;''' | |
_EYE_CANDY_LIST = ['http://vignette3.wikia.nocookie.net/looneytunes/images/4/47/Speedy_Gonzales.jpg/revision/latest?cb=20060220031648', | |
'https://s-media-cache-ak0.pinimg.com/originals/99/38/75/9938756c5636458f6bb553291274de65.jpg', | |
'https://s-media-cache-ak0.pinimg.com/736x/1e/a1/f1/1ea1f101d9d562ef428211eb53233cae.jpg', | |
'http://foghornleghornquotes.com/wp-content/uploads/Foghorn-on-the-Farm.jpg', | |
'http://images6.fanpop.com/image/photos/33200000/marvin-the-martian-looney-tunes-33226447-403-373.jpg', | |
'http://img.veehd.com/3154746_l2.jpg', | |
'http://thelivingstonpost.com/wp-content/uploads/2012/10/wile_e_coyote_adhesive_vinyl_decal_sticker5__59675.jpg', | |
'http://i.ytimg.com/vi/VEFmFMeXV3E/hqdefault.jpg', | |
'http://m.cdn.blog.hu/vi/vilagevo/image/nyc/brklyn/news_photo_56969_1409171882.jpg', | |
'http://www.sitcomsonline.com/photopost/data/3429/1foghorn_leghorn-5230.jpg', | |
'https://d1u1p2xjjiahg3.cloudfront.net/53ead210-1e5b-41b9-bfde-456bdd021e49.jpg', | |
'http://randomville.com/Image/music/music_features/MK1goodman04_RabbitOfSeville.jpg', | |
'http://media.animevice.com/uploads/1/11232/634659-287054_72477_bugs_bunny_super_super.jpg', | |
'https://s-media-cache-ak0.pinimg.com/originals/82/09/66/8209663a65616013a280317df0c216c2.gif', | |
'http://www.cbc.ca/allinaday/images/littlemermaid.jpg', | |
] | |
SPARK_CLUSTER_URI = "spark://{}:7077".format(import_spark_cluster_ip) | |
# FUNCTIONS | |
def compute_end_dt(start_dt, task_instance): | |
""" | |
Computes end_dt from start_dt | |
""" | |
return start_dt+timedelta(seconds=import_airflow_granularity_secs) | |
def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis): | |
logger.info('Executing SLA miss callback') | |
message_type = 'info' | |
if ENV == 'EP_PROD' and import_ep_pipeline_victorops_alerting: | |
message_type = 'critical' | |
# Execute VictorOps alerting | |
message_name = "{env} - {dag_name} SLA Miss for task `{task_list}`".format(**{ | |
'dag_name': dag.dag_id, | |
'task_list': task_list, | |
'env': ENV, | |
}) | |
userdata ={"message_type": message_type, | |
"state_message": message_name, | |
"monitoring_tool": "airflow", | |
"entity_id":"airflow/telemetry/SLA_miss"} | |
requests.post('https://alert.victorops.com/integrations/generic/20131114/alert/{}/EP'.format(victorops_key), data=json.dumps(userdata)) | |
# Execute Slack alerting | |
report_issues_on_slack_util(message_name) | |
def wait_for_collector_ingest(ds, **kwargs): | |
# Sleep unless this is being run after the first n mins of the hour | |
cur_minute_of_hour = datetime.now().time().minute | |
cur_second_of_minute = datetime.now().time().second | |
cur_second_of_hour = (cur_minute_of_hour * 60) + cur_second_of_minute | |
if cur_second_of_hour < import_airflow_collector_ingest_delay_secs: | |
# Sleep the remainder of time | |
remainder_secs = import_airflow_collector_ingest_delay_secs - cur_second_of_hour | |
logger.info("wait_for_collector_ingest : About to wait {} seconds for collector ingest to complete since cur_second_of_hour={}".format(remainder_secs, | |
cur_second_of_hour)) | |
time.sleep(import_airflow_collector_ingest_delay_secs) | |
logger.info("wait_for_collector_ingest : Finished waiting {} seconds for collector ingest to complete".format(remainder_secs)) | |
def ping_healthchecks_io(ds, **kwargs): | |
if healthchecks_io_url: | |
requests.get(healthchecks_io_url) | |
# Purge DLQ if it has items on it | |
def purge_DLQ(ds, **kwargs): | |
import_sqs_dlq_name = "{}-{}".format(import_sqs_queue_name, "DLQ") | |
was_successful = purge_sqs_queue(import_sqs_dlq_name) | |
return was_successful | |
# Check for time to run vacuum analyze | |
def check_for_time_to_run_vacuum_analyze_condition(ds, **kwargs): | |
''' | |
Check whether it is time to run vacuum analyze | |
''' | |
dt_hour = kwargs['execution_date'].time().hour | |
if dt_hour == import_ep_pipeline_vacuum_analyze_scheduled_hour: | |
return True | |
else: | |
return False | |
# Run vacuum analyze | |
def run_vacuum_analyze(ds, **kwargs): | |
''' | |
Run vacuum analyze | |
''' | |
# Set the connection string based on the environment | |
db_conn_string = import_ep_db_connect_string | |
db_conn = psycopg2.connect(db_conn_string) | |
logger.info("----- Successfully Connected to database {}".format(db_conn_string)) | |
cursor = db_conn.cursor() | |
run_vacuum_analyze_utils(db_conn, cursor) | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return True | |
# Build models only if the nth hour! | |
def check_for_time_to_build_model_branch_condition(ds, **kwargs): | |
''' | |
Check whether it is time to build models | |
''' | |
if not import_ep_pipeline_model_build_in_hourly: | |
return 'prejoin_preagg_dummy_job' | |
dt_hour = kwargs['execution_date'].time().hour | |
if dt_hour == 6: | |
return 'build_sender_models_spark_job' | |
else: | |
return 'prejoin_preagg_dummy_job' | |
# Check model building is successful | |
def check_for_successful_model_building_branch_condition(ds, **kwargs): | |
''' | |
Check model building is successful | |
''' | |
successful = verify_model_building_successful() | |
if successful: | |
return 'delete_db_data' | |
else: | |
return 'send_sns_notification_model_building_failed' | |
def send_sns_notification_model_building_failed(ds, **kwargs): | |
# Return immediately if email is disabled | |
if not import_airflow_enable_notifications: | |
return | |
conn = boto.sns.connect_to_region(region) | |
message = 'Airflow EP Data pipeline flow completed with failed model building {} environment for: {} GMT/UTC'.format(ENV, | |
kwargs['execution_date'].strftime(AIRFLOW_INPUT_DATE_FORMAT)) | |
conn.publish(message=message, subject='Airflow Flow Complete with failed model building', target_arn=import_sns_topic_arn_for_job_status, | |
) | |
logger.info("----- Sent SNS about FLOW Completion : model building failure") | |
return True | |
# Verify model building is successful | |
def verify_model_building_successful(): | |
return verify_model_building_successful_util(region, | |
import_airflow_importer_metadata_bucket_name) | |
# Verify model building is successful | |
def verify_model_building_successful_util(aws_region, metadata_bucket_name ): | |
''' | |
Check for the presence of certain files to ensure that model building is successful | |
e.g. Under appropriate metadata bucket, check | |
* activereputation/reputation-{epoch time}.json | |
* models/sendermodels-*.avro | |
* cousin_domains/cousin-scores.json (optional) | |
''' | |
conn = boto.s3.connect_to_region(aws_region) | |
bucket = conn.get_bucket(metadata_bucket_name) | |
now_epoch_time = int(time.time()) | |
# Check the activereputation/reputation-{epoch time}.json | |
bucket_list = bucket.list(prefix='activereputation/reputation') | |
# TODO : This won't be acceptable once the list of keys grows into the tens of thousands | |
ordered_list = sorted(bucket_list, key=lambda k: k.last_modified, reverse=True) | |
most_recent_key = ordered_list[0] | |
epoch_search = re.search('activereputation/reputation-(.*).json', most_recent_key.key, re.IGNORECASE) | |
common_log_message = '----- verify_model_building_successful : check for recent activereputation/reputation file:' | |
if epoch_search: | |
rep_epoch_time = float(epoch_search.group(1)) | |
# don't run model building more than once a day | |
if rep_epoch_time < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS): | |
logger.info("{} {} Failed".format(common_log_message, most_recent_key.key)) | |
return False | |
else: | |
logger.info("{} {} Succeeded".format(common_log_message, most_recent_key.key)) | |
# Check the models/sendermodels-*.avro | |
bucket_list = bucket.list(prefix='models/sendermodels') | |
# TODO : This won't be acceptable once the list of keys grows into the tens of thousands | |
ordered_list = sorted(bucket_list, key=lambda k: k.last_modified, reverse=True) | |
most_recent_key = ordered_list[0] | |
epoch_search = re.search('models/sendermodels-(\d*\.\d*)\D*', most_recent_key.key, re.IGNORECASE) | |
common_log_message = '----- verify_model_building_successful : check for recent models/sendermodels file:' | |
if epoch_search: | |
sm_epoch_time = float(epoch_search.group(1)) | |
if sm_epoch_time < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS): | |
logger.info("{} {} Failed".format(common_log_message, most_recent_key.key)) | |
return False | |
else: | |
logger.info("{} {} Succeeded".format(common_log_message, most_recent_key.key)) | |
# Check the cousin_domains/cousin-scores.json | |
# time is returned as 'Mon, 02 Nov 2015 23:33:55 GMT' | |
cd_lmd = bucket.get_key('cousin_domains/cousin_scores.json').last_modified | |
lmd = datetime.strptime(cd_lmd, '%a, %d %b %Y %H:%M:%S %Z') | |
cd_epoch = lmd.strftime('%s') | |
now_epoch_time = int(time.time()) | |
common_log_message = '----- verify_model_building_successful : check for recent cousin_domains/cousin_scores.json file:' | |
if cd_epoch < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS): | |
logger.info("{} Failed. LMD = {}".format(common_log_message, cd_lmd)) | |
return False | |
else: | |
logger.info("{} Succeeded. LMD = {}".format(common_log_message, cd_lmd)) | |
# If we get here, model building is successful | |
return True | |
# Delete RDA, RIA, and Message records in the table for the target date range | |
def delete_db_data(ds, **kwargs): | |
''' | |
Delete data from the RDA, RIA, and Message tables | |
''' | |
# Format and compute dates for the time bounds | |
start_dt, end_dt = get_start_and_end_datetime(**kwargs) | |
# Delete data | |
delete_db_data_util(import_ep_db_connect_string, | |
start_dt, | |
end_dt) | |
return True | |
def delete_db_discrepancy_data(db_conn_string, execution_date): | |
''' | |
Delete data discrepancy | |
''' | |
# Get start and end dates | |
start_date, end_date = get_start_and_end_datetime(execution_date=execution_date) | |
# Set the connection string based on the environment | |
db_conn = psycopg2.connect(db_conn_string) | |
logger.info("----- Successfully Connected to database {}".format(db_conn_string)) | |
cursor = db_conn.cursor() | |
DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND = DELETE_TELEMETRY_DATA_DISC_QUERY % (start_date, end_date) | |
# Execute queries | |
logger.info("----- Executing the following query against the db : {}".format(DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND)) | |
cursor.execute(DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND) | |
db_conn.commit() # close the transaction | |
# Always vacuum analyze | |
db_conn.autocommit = True | |
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_DISC_METRICS_QUERY)) | |
cursor.execute(VACUUM_ANALYZE_DISC_METRICS_QUERY) | |
db_conn.autocommit = False | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return True | |
# Utility method to delete RDA, RIA, and Message records in the table for the target date range | |
def delete_db_data_util(db_conn_string, start_date, end_date, vacuum_analyze=False, reload_data=False): | |
''' | |
Delete data from the RDA, RIA, and Message tables | |
''' | |
# Set the connection string based on the environment | |
db_conn = psycopg2.connect(db_conn_string) | |
logger.info("----- Successfully Connected to database {}".format(db_conn_string)) | |
cursor = db_conn.cursor() | |
DELETE_RIA_QUERY_BOUND = DELETE_RIA_QUERY % (start_date, end_date) | |
DELETE_M_QUERY_BOUND = DELETE_M_QUERY % (start_date, end_date) | |
DELETE_RDA_QUERY_BOUND = DELETE_RDA_QUERY % (start_date, end_date) | |
# Execute queries | |
logger.info("----- Executing the following query against the db : {}".format(DELETE_RIA_QUERY_BOUND)) | |
cursor.execute(DELETE_RIA_QUERY_BOUND) | |
logger.info("----- Executing the following query against the db : {}".format(DELETE_M_QUERY_BOUND)) | |
cursor.execute(DELETE_M_QUERY_BOUND) | |
logger.info("----- Executing the following query against the db : {}".format(DELETE_RDA_QUERY_BOUND)) | |
cursor.execute(DELETE_RDA_QUERY_BOUND) | |
db_conn.commit() # close the transaction | |
# Do optional Vacuum Analyze | |
if vacuum_analyze: | |
run_vacuum_analyze_utils(db_conn, cursor) | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return True | |
# Function to execute vacuum analyze | |
def run_vacuum_analyze_utils(db_conn, cursor): | |
db_conn.autocommit = True | |
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_RDA_QUERY)) | |
cursor.execute(VACUUM_ANALYZE_RDA_QUERY) | |
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_RIA_QUERY)) | |
cursor.execute(VACUUM_ANALYZE_RIA_QUERY) | |
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_MSG_QUERY)) | |
cursor.execute(VACUUM_ANALYZE_MSG_QUERY) | |
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_MSG_AGG_QUERY)) | |
cursor.execute(VACUUM_ANALYZE_MSG_AGG_QUERY) | |
db_conn.autocommit = False | |
def check_for_sqs_message_branch_condition(ds, **kwargs): | |
conn = boto.sqs.connect_to_region(region) | |
logger.info("----- Connecting to SQS Queue : {}".format(import_sqs_queue_name)) | |
q = conn.get_queue(import_sqs_queue_name) | |
rs = q.get_messages() | |
found_message = True | |
start_time = datetime.now() | |
while rs is None or len(rs) <= 0: | |
elapsed_time_seconds = (datetime.now() - start_time).seconds | |
if elapsed_time_seconds >= _MAX_WAIT_TIME_SECONDS_FOR_NEW_SQS_MESSAGE: | |
found_message = False | |
break; | |
logger.info("----- WAITING FOR A MESSAGE on SQS Queue : {}".format(import_sqs_queue_name)) | |
time.sleep(_SLEEP_INTERVAL_SECONDS) | |
rs = q.get_messages() | |
if found_message: | |
logger.info("----- FOUND A MESSAGE on SQS Queue : {}".format(import_sqs_queue_name)) | |
return 'wait_for_new_data_in_db' | |
else: | |
logger.info("----- GAVE UP ON WAITING FOR A MESSAGE SQS Queue : {}".format(import_sqs_queue_name)) | |
return 'send_sns_notification_no_spark_data' | |
def wait_for_empty_queue(ds, **kwargs): | |
conn = boto.sqs.connect_to_region(region) | |
q = conn.get_queue(import_sqs_queue_name) | |
q_size = q.count() | |
while q_size > 0: | |
logger.info("----- WAITING FOR Queue {} to Drain : Messages remaining : {}".format(import_sqs_queue_name, q_size)) | |
q = conn.get_queue(import_sqs_queue_name) | |
q_size = q.count() | |
time.sleep(_SLEEP_INTERVAL_SECONDS) | |
# Once the queue drains, we still have many messages in flight, especially if we have 20 ASG instances launched! | |
attr = q.get_attributes() | |
inflight_message_count = int(attr['ApproximateNumberOfMessagesNotVisible']) | |
while inflight_message_count > 0: | |
logger.info("----- WAITING FOR Queue {} to Drain : In-flight Messages remaining : {}".format(import_sqs_queue_name, | |
inflight_message_count)) | |
q = conn.get_queue(import_sqs_queue_name) | |
attr = q.get_attributes() | |
inflight_message_count = int(attr['ApproximateNumberOfMessagesNotVisible']) | |
time.sleep(_SLEEP_INTERVAL_SECONDS) | |
logger.info("----- Queue {} Empty".format(import_sqs_queue_name)) | |
return True | |
# Simplied version of the db data wait method that | |
# only checks that the row count is >0 | |
def wait_for_new_data_in_db_simple(ds, **kwargs): | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
cursor = db_conn.cursor() | |
# Establish a base line | |
start_dt, end_dt = get_start_and_end_datetime(**kwargs) | |
_MESSAGE_COUNT_QUERY_WITH_TS = _MESSAGE_COUNT_QUERY % (start_dt, end_dt) | |
logger.info("----- Executing the following query against the db : {}".format(_MESSAGE_COUNT_QUERY_WITH_TS)) | |
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS) | |
result = cursor.fetchone() | |
initial_row_count = int(result[0]) | |
db_conn.commit() #close the transaction | |
logger.info("----- FOUND A NEW DATA in DB (Simplified) : (row count) = {}".format(initial_row_count)) | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return initial_row_count>0 | |
# Waits for new data in the db and expect to observe data being added | |
def wait_for_new_data_in_db(ds, **kwargs): | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
cursor = db_conn.cursor() | |
# Establish a base line | |
start_dt, end_dt = get_start_and_end_datetime(**kwargs) | |
_MESSAGE_COUNT_QUERY_WITH_TS = _MESSAGE_COUNT_QUERY % (start_dt, end_dt) | |
logger.info("----- Executing the following query against the db : {}".format(_MESSAGE_COUNT_QUERY_WITH_TS)) | |
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS) | |
result = cursor.fetchone() | |
initial_row_count = int(result[0]) | |
db_conn.commit() #close the transaction | |
found_new_data = None | |
# Detect a change in the record count | |
while not found_new_data: | |
logger.info("----- WAITING FOR A NEW DATA in DB : base count = {}".format(initial_row_count)) | |
time.sleep(_SLEEP_INTERVAL_SECONDS) | |
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS) | |
result = cursor.fetchone() | |
new_row_count = int(result[0]) | |
if(new_row_count > initial_row_count): | |
found_new_data = True | |
db_conn.commit() #close the transaction | |
logger.info("----- FOUND A NEW DATA in DB : (base count ==> new count) = ({} ==> {})".format(initial_row_count, new_row_count)) | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return True | |
def send_sns_notification_no_spark_data(ds, **kwargs): | |
# Return immediately if email is disabled | |
if not import_airflow_enable_notifications: | |
return | |
date_format = AIRFLOW_INPUT_DATE_FORMAT | |
conn = boto.sns.connect_to_region(region) | |
message = 'Airflow EP Data pipeline flow completed with no new Spark data in the {} environment for : {} GMT/UTC'.format(ENV, | |
kwargs['execution_date'].strftime(date_format)) | |
conn.publish(message=message, subject='Airflow Flow Complete with No New Data', target_arn=import_sns_topic_arn_for_job_status, | |
) | |
logger.info("----- Sent SNS about FLOW Completion : no new spark data") | |
return True | |
def get_record_org_histos_for_tables(execution_date): | |
# Establish a base line | |
start_dt, end_dt = get_start_and_end_datetime(execution_date=execution_date) | |
date_format = AIRFLOW_INPUT_DATE_FORMAT | |
start_date_string = start_dt.strftime(date_format) | |
end_date_string = end_dt.strftime(date_format) | |
_RDA_ORG_HISTO_QUERY_WITH_TS = _RDA_ORG_HISTO_QUERY % (start_date_string, end_date_string) | |
_MESSAGE_ORG_HISTO_QUERY_WITH_TS = _MESSAGE_ORG_HISTO_QUERY % (start_date_string, end_date_string) | |
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS = _NRT_MESSAGE_ORG_HISTO_QUERY % (start_date_string, end_date_string) | |
logger.info("----- Executing the following queries against the db : {} and {} and {}".format(_RDA_ORG_HISTO_QUERY_WITH_TS, | |
_MESSAGE_ORG_HISTO_QUERY_WITH_TS, | |
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS)) | |
return get_record_histos_for_tables(_RDA_ORG_HISTO_QUERY_WITH_TS, | |
'RDA_ORG_HISTO', | |
_MESSAGE_ORG_HISTO_QUERY_WITH_TS, | |
'MESSAGE_ORG_HISTO', | |
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS, | |
'NRT_MESSAGE_ORG_HISTO') | |
def get_record_date_histos_for_tables(start_dt, end_dt, reload_data=False): | |
if reload_data: | |
# For queries, we need to cap at 1 day above with "<" condition | |
end_date = end_dt + timedelta(days=1) | |
else: | |
# For queries, we need to cap at 1 hour above with "<" condition | |
end_date = end_dt + timedelta(hours=1) | |
message_query = _MESSAGE_HISTO_QUERY % (start_dt, end_date) | |
rda_query = _RDA_HISTO_QUERY % (start_dt, end_date) | |
nrt_message_query = _NRT_MESSAGE_HISTO_QUERY % (start_dt, end_date) | |
return get_record_histos_for_tables(rda_query, | |
'RDA_HISTO', | |
message_query, | |
'MESSAGE_HISTO', | |
nrt_message_query, | |
'NRT_MESSAGE_HISTO') | |
def get_record_histos_for_tables(rda_histo_query, | |
rda_histo_key_name, | |
message_histo_query, | |
message_histo_key_name, | |
nrt_message_histo_query, | |
nrt_message_histo_key_name): | |
''' | |
This returns RDA/Message histos | |
''' | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
# Open a cursor | |
cursor = db_conn.cursor() | |
# Create the queries | |
logger.info("----- Executing the following queries against the db : {} & {}".format(message_histo_query, | |
rda_histo_query)) | |
# Execute the queries - first message histo count | |
cursor.execute(message_histo_query) | |
message_histo_records = cursor.fetchall() | |
db_conn.commit() #close the transaction | |
# Execute the queries - next rda histo count | |
cursor.execute(rda_histo_query) | |
rda_histo_records = cursor.fetchall() | |
db_conn.commit() #close the transaction | |
# nrt_message histo count | |
cursor.execute(nrt_message_histo_query) | |
nrt_message_histo_records = cursor.fetchall() | |
db_conn.commit() | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
return {message_histo_key_name:message_histo_records, | |
rda_histo_key_name:rda_histo_records, | |
nrt_message_histo_key_name:nrt_message_histo_records} | |
# Get collector histos from s3 for validation | |
def get_collector_histos(**kwargs): | |
# Get the list of ingest disabled orgs to skip for non-prod envs | |
ingest_disabled_orgs = set() | |
if not _is_prod(): | |
orgs_ingest_lookup_pre_run = kwargs['task_instance'].xcom_pull(task_ids='discover_ingest_enabled_orgs') | |
for key, pre_value in orgs_ingest_lookup_pre_run.iteritems(): | |
if not pre_value: | |
ingest_disabled_orgs.add(key) | |
# Get collector file counts | |
#collector_dict = get_collector_file_message_counts_per_org(ingest_disabled_orgs, | |
# kwargs['execution_date']) | |
collector_dict = get_collector_file_message_counts_per_org(kwargs['execution_date']) | |
converted_dict = {} | |
for k,v in collector_dict.iteritems(): | |
converted_dict[long(k)] = long(v) | |
return converted_dict | |
# Common logic to the agg_avro and agg_avro_skipped histo methods | |
def agg_avro_histos_common(execution_date, suffix): | |
conn = boto.s3.connect_to_region(region) | |
bucket = conn.get_bucket(import_airflow_agg_bucket_name) | |
# Compute the agg start and end dates using the lookback | |
# NOTE : params.import_aggregate_lookback_days is non-positive | |
start_dt, end_dt = get_start_and_end_datetime(execution_date=execution_date) | |
agg_start_date_epoch = start_dt.strftime(SPARK_DATE_FORMAT) | |
agg_end_date_epoch = end_dt.strftime(SPARK_DATE_FORMAT) | |
logger.info('agg_avro_histos_common Stats located at domain_aggregate-{}-{}'.format(agg_start_date_epoch, | |
agg_end_date_epoch)) | |
rs = bucket.list("domain_aggregate-{}-{}".format(agg_start_date_epoch, | |
agg_end_date_epoch)) | |
# Since there may be many runs, find the most recent run | |
most_recent_lmd = None | |
most_recent_key = None | |
for key in rs: | |
if key.key.endswith(suffix): | |
if not most_recent_lmd or most_recent_lmd < key.last_modified: | |
most_recent_lmd = key.last_modified | |
most_recent_key = key | |
return (most_recent_lmd, most_recent_key) | |
# Get agg_avro histos from s3 for validation | |
def get_agg_avro_histos(execution_date): | |
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date, | |
"summary.json") | |
summary = None | |
if most_recent_key: | |
summary = json.loads(most_recent_key.get_contents_as_string()) | |
# We want { 1: 11534, 6: 87200, 7: 16446 } | |
org_to_message_count_dict = dict() | |
if 'aggregated_counts_by_org_id' in summary: | |
filtered_by_org_id_dict = summary['aggregated_counts_by_org_id'] | |
# At this point, we have {u'12': 0, u'21': 0, u'1': 2713, u'5': 0, u'7': 2181, u'6': 0} | |
for key in filtered_by_org_id_dict: | |
org_to_message_count_dict[int(key)] = filtered_by_org_id_dict[key] | |
# Print out the contents | |
logger.info('Agg Avro Summary Stats located at %s' % (most_recent_key)) | |
logger.info("Agg Avro Summary Stats : org_to_message_count_dict = %s" % org_to_message_count_dict ) | |
return org_to_message_count_dict | |
def define_message_alert_job(org_id, start_dt, interval): | |
job_id = str(uuid.uuid4()) | |
job = {'job_class': 'MessageAlertJob', | |
'job_id': job_id, | |
'arguments': [org_id, start_dt, interval]} | |
return (job_id, json.dumps(job), 0) | |
# Enqueue a job over SQS to check for Alerts | |
def enqueue_alerting_jobs(ds, **kwargs): | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
cursor = db_conn.cursor() | |
logger.info("----- Executing the following query against the db : {}".format(ALL_ORGS_QUERY)) | |
cursor.execute(ALL_ORGS_QUERY) | |
org_ids = [r[0] for r in cursor.fetchall()] | |
cursor.close() | |
db_conn.close() | |
start_dt = kwargs['execution_date'].strftime(ISO_8601_DATE_TIME_FORMAT) | |
interval = int(import_airflow_granularity_secs) | |
jobs = map(lambda org: define_message_alert_job(org, start_dt, interval), org_ids) | |
job_batches = [jobs[i:i + 10] for i in range(0, len(jobs), 10)] | |
conn = boto.sqs.connect_to_region(region) | |
logger.info("----- Writing MessageAlertJobs to queue {}".format(import_ep_pipeline_alerting_queue_name)) | |
q = conn.get_queue(import_ep_pipeline_alerting_queue_name) | |
for batch in job_batches: | |
logger.info("----- Enqueuing job batch for orgs :{}".format(batch)) | |
status = q.write_batch(batch) | |
if status.errors: | |
logger.error("Batch failures {}".format(status.errors)) | |
# Get agg_avro skipped by skip type histos from s3 for validation | |
def get_agg_avro_skipped_by_type_histos(execution_date): | |
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date, | |
"summary.json") | |
# Set the summary contents from the most recent key | |
summary = None | |
if most_recent_key: | |
summary = json.loads(most_recent_key.get_contents_as_string()) | |
# We want { 1: {"internal": 30}, 14: {"internal": 31737, "invalid_hdr_from": 1, "invalid_ip": 2}} | |
org_to_skip_type_to_count_dict = dict() | |
if 'skip_type_counts' in summary: | |
skip_counts_by_org_and_skip_type_dict = summary['skip_type_counts'] | |
# At this point, we have {"14": {"internal": 31737, "invalid_hdr_from": 1, "invalid_ip": 2}} | |
for key in skip_counts_by_org_and_skip_type_dict: | |
org_to_skip_type_to_count_dict[int(key)] = skip_counts_by_org_and_skip_type_dict[key] | |
# Print out the contents | |
logger.info('Agg Avro Skipped Stats located at %s' % (most_recent_key)) | |
logger.info("Agg Avro Skipped Stats : org_to_skip_type_to_count_dict = %s" % org_to_skip_type_to_count_dict) | |
return org_to_skip_type_to_count_dict | |
# Get agg_avro skipped histos from s3 for validation | |
def get_agg_avro_skipped_histos(execution_date): | |
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date, | |
"summary.json") | |
# Set the summary contents from the most recent key | |
summary = None | |
if most_recent_key: | |
summary = json.loads(most_recent_key.get_contents_as_string()) | |
# We want { 1: 11534, 6: 87200, 7: 16446 } | |
org_to_message_count_dict = dict() | |
if 'deltas_by_org_id' in summary and 'filtered_by_org_id' in summary['deltas_by_org_id']: | |
filtered_by_org_id_dict_string = summary['deltas_by_org_id']['filtered_by_org_id'] | |
filtered_by_org_id_dict = filtered_by_org_id_dict_string | |
# At this point, we have {u'12': 0, u'21': 0, u'1': 2713, u'5': 0, u'7': 2181, u'6': 0} | |
for key in filtered_by_org_id_dict: | |
org_to_message_count_dict[int(key)] = filtered_by_org_id_dict[key] | |
# Print out the contents | |
logger.info('Agg Avro Skipped Stats located at %s' % (most_recent_key)) | |
logger.info("Agg Avro Skipped Stats : org_to_message_count_dict = %s" % org_to_message_count_dict ) | |
return org_to_message_count_dict | |
# Collect DLQ Stats and information | |
def collects_DLQ_stats_simple(): | |
# Get the DLQ Count and return immediately if there are no messages | |
url_list = [] | |
conn = boto.sqs.connect_to_region(region) | |
import_sqs_dlq_name = "{}-{}".format(import_sqs_queue_name, "DLQ") | |
logger.info("----- Checking DLQ named {} for messages".format(import_sqs_dlq_name)) | |
q = conn.get_queue(import_sqs_dlq_name) | |
dlq_q_size = q.count() | |
if dlq_q_size == 0: | |
return (dlq_q_size, url_list) | |
logger.info("----- Found DLQs dlq_count: {} messages on DLQ named {}".format(dlq_q_size, | |
import_sqs_dlq_name)) | |
# Get a single message and figure out the folder path leading to the object | |
rs = q.get_messages(num_messages=1) | |
if len(rs) > 0: | |
m = rs[0] | |
body = json.loads(m.get_body()) | |
message_internal = json.loads(body["Message"]) | |
records = message_internal["Records"] | |
# Grab the first record | |
if len(records) > 0: | |
record = records[0] | |
bucket = record['s3']['bucket']['name'] | |
key = record['s3']['object']['key'] | |
logger.info("collects_DLQ_stats : bucket={} and key={}".format(bucket, key)) | |
# Remove the trailing '/' if it exists | |
if key.endswith('/'): | |
key = key[:-1] | |
new_key = os.path.dirname(key) | |
url = 'https://console.aws.amazon.com/s3/home?region={}&bucket={}&prefix={}'.format(region, | |
import_airflow_importer_failures_bucket_name, | |
new_key) | |
url_list.append(url) | |
logger.info("collects_DLQ_stats : URL LIST = %s" % (url_list)) | |
return (dlq_q_size, url_list) | |
# Collect stats for validation | |
def collects_stats_for_validation(**kwargs): | |
''' | |
Report data discrepancies at each of the stages of the EP data pipeline: | |
* Stage 1 : Customer --> Collector files | |
* Stage 2 : Collector files --> run_aggr --> agg files | |
* Stage 3 : agg files --> importer --> DB | |
For each data source, we want org:{rda_count, message count}, though the first iteration | |
might just contain message counts. | |
''' | |
# Pre-Stage 1 : Check Collector input counts | |
collector_org_counts_dict = get_collector_histos(**kwargs) | |
# Post-Stage 2 : Check Agg counts and skipped counts | |
agg_avro_org_counts_dict = get_agg_avro_histos(kwargs['execution_date']) | |
agg_avro_org_skipped_counts_dict = get_agg_avro_skipped_histos(kwargs['execution_date']) | |
agg_avro_org_to_skip_type_to_count_dict = get_agg_avro_skipped_by_type_histos(kwargs['execution_date']) | |
# Post-Stage 3 : Check DB counts | |
db_org_counts_dict = get_record_org_histos_for_tables(kwargs['execution_date']) | |
return {'collector_org_counts_dict':collector_org_counts_dict, | |
'db_org_counts_dict':db_org_counts_dict, | |
'agg_avro_org_counts_dict':agg_avro_org_counts_dict, | |
'agg_avro_org_skipped_counts_dict':agg_avro_org_skipped_counts_dict, | |
'agg_avro_org_to_skip_type_to_count_dict':agg_avro_org_to_skip_type_to_count_dict | |
} | |
# Validate the pipeline stages | |
def validate_stages(**kwargs): | |
# Create a date object from this String | |
date_format = '%Y-%m-%d' | |
# Validate the stages | |
return validate_stages_util(**kwargs) | |
# get a comma-delimited string of org_ids to send to spark jobs | |
def get_active_org_ids_string(): | |
org_dict = get_org_ingest_dict() | |
org_ids = sorted([ int(id_) for id_ in org_dict.keys() if org_dict[id_] ]) # sorted for easier debugging | |
return ','.join([ str(id_) for id_ in org_ids ]) | |
def get_org_ingest_dict(): | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
cursor = db_conn.cursor() | |
logger.info("----- Executing the following query against the db : {}".format(GET_ORGS_INGEST_QUERY)) | |
cursor.execute(GET_ORGS_INGEST_QUERY) | |
result = dict() | |
for r in cursor.fetchall(): | |
result[r[0]] = r[1] | |
cursor.close() | |
db_conn.close() | |
return result | |
def validate_stages_util(**kwargs): | |
stages_org_counts_dict = collects_stats_for_validation(**kwargs) | |
pp.pprint(stages_org_counts_dict) | |
# We start with list of tuples (org_id, message_count): one for db | |
collector_org_message_count_dict = stages_org_counts_dict['collector_org_counts_dict'] | |
db_org_message_count_tuples = stages_org_counts_dict['db_org_counts_dict']['MESSAGE_ORG_HISTO'] | |
db_org_nrt_message_count_tuples = stages_org_counts_dict['db_org_counts_dict']['NRT_MESSAGE_ORG_HISTO'] | |
agg_avro_org_counts_dict = stages_org_counts_dict['agg_avro_org_counts_dict'] | |
agg_avro_org_skipped_counts_dict = stages_org_counts_dict['agg_avro_org_skipped_counts_dict'] | |
agg_avro_org_to_skip_type_to_count_dict = stages_org_counts_dict['agg_avro_org_to_skip_type_to_count_dict'] | |
# Make a set of the different types of skips seen. These will be additional columns per org. | |
# This set of columns is dynamic because we want to support anything returned by the | |
# aggregation/scoring code | |
skip_types_seen = set() | |
for org_id, skip_type_and_count_dict in agg_avro_org_to_skip_type_to_count_dict.items(): | |
for skip_type, count in skip_type_and_count_dict.items(): | |
skip_types_seen.add(skip_type) | |
logger.info("skip_types_seen = {}".format(skip_types_seen)) | |
# Note : I won't be providing 0-value defaults for the skip type dicts | |
# There may be elements in the upstream lists that are not in the downstream lists. | |
# Assumption : Upstream lists are more complete in terms of org representation. | |
# Downstream bugs can cause duplicate counting for an org, but they won't cause an org | |
# to be fabricated from thin air | |
# Create dicts, where the key is org_id, value is message counts | |
db_message_counts_keyed_by_org_dict = dict(db_org_message_count_tuples) | |
db_nrt_message_counts_keyed_by_org_dict = dict(db_org_nrt_message_count_tuples) | |
# Look up the pre and post-agg run orgs to determine which one were | |
# ingest-enabled for the entire run. For those, we will compute discrepancies | |
orgs_ingest_lookup_post_run = get_org_ingest_dict() | |
orgs_ingest_lookup = get_finalized_list_of_ingest_enabled_orgs(orgs_ingest_lookup_post_run, **kwargs) | |
# Create a dict of tuples called message_stats_by_stage. The key is org_id, the value is a tuple | |
# where each member represents counts for a stage | |
message_stats_by_stage = dict() | |
for col_org_id, col_msg_cnt in collector_org_message_count_dict.iteritems(): | |
# If an org is present in the DB, use its count, else use 0 | |
db_msg_cnt = 0 | |
if col_org_id in db_message_counts_keyed_by_org_dict: | |
db_msg_cnt = db_message_counts_keyed_by_org_dict[col_org_id] | |
db_nrt_msg_cnt = 0 | |
if col_org_id in db_nrt_message_counts_keyed_by_org_dict: | |
db_nrt_msg_cnt = db_nrt_message_counts_keyed_by_org_dict[col_org_id] | |
# If an org is present in the agg_avro, use its count, else use 0 | |
agg_avro_msg_cnt = 0 | |
if col_org_id in agg_avro_org_counts_dict: | |
agg_avro_msg_cnt = agg_avro_org_counts_dict[col_org_id] | |
# If an org is present in the agg_avro_skipped, use its count, else use 0 | |
agg_avro_msg_skipped_cnt = 0 | |
if col_org_id in agg_avro_org_skipped_counts_dict: | |
agg_avro_msg_skipped_cnt = agg_avro_org_skipped_counts_dict[col_org_id] | |
# Put all results into a tuple | |
message_stats_by_stage[col_org_id] = (col_msg_cnt, | |
agg_avro_msg_cnt, | |
db_msg_cnt, | |
agg_avro_msg_skipped_cnt, | |
skip_types_seen, | |
agg_avro_org_to_skip_type_to_count_dict, | |
db_nrt_msg_cnt) | |
# Sort Dictionary by key | |
message_stats_by_stage_ord_by_org = OrderedDict(sorted(message_stats_by_stage.items())) | |
# Print out some info to logging | |
for org_id, message_counts in message_stats_by_stage_ord_by_org.iteritems(): | |
logger.info("ORG:{} ==> Collector Count: {}, Agg Avro Count: {}, DB Count: {}, Agg Avro Skipped: {}" \ | |
.format(str(org_id), | |
str(message_counts[0]), | |
str(message_counts[1]), | |
str(message_counts[2]), | |
str(message_counts[3]))) | |
# If the collector and db messages counts don't agree | |
if message_counts[0] != message_counts[3]: | |
discrepancy = message_counts[0] - message_counts[2] - message_counts[3] | |
discrepancy_pct = perc(discrepancy, message_counts[0]) | |
if orgs_ingest_lookup[org_id]: | |
le_alert_dict = { 'org_id': org_id, 'collector_count': message_counts[0], 'db_count': message_counts[2], 'discrepancy_count': discrepancy, 'discrepancy_pct': discrepancy_pct } | |
alert_str = "Telemetry pipeline discrepancy - " + json.dumps(le_alert_dict) | |
logger.warning(alert_str) | |
return message_stats_by_stage_ord_by_org | |
# This method combines pre-run and post-run views of the ingest-enabled orgs | |
def get_finalized_list_of_ingest_enabled_orgs(orgs_ingest_lookup_post_run, **kwargs): | |
''' | |
For an org to included in the finalized list, it needs to have been enabled for ingest | |
during both pre- and post-run. We really mean pre- and post-aggregation run here! | |
''' | |
orgs_ingest_lookup_pre_run = kwargs['task_instance'].xcom_pull(task_ids='discover_ingest_enabled_orgs') | |
finalized_dict = dict() | |
for key, pre_value in orgs_ingest_lookup_pre_run.iteritems(): | |
# Handle case where an org expires or is deleted since the pre_run lookup. We don't want to get a key error | |
# on orgs_ingest_lookup_post_run[key] | |
post_value = False | |
if key in orgs_ingest_lookup_post_run: | |
post_value = orgs_ingest_lookup_post_run[key] | |
logger.info("Ingest Enabled Orgs : key={}, pre_value={}, post_value={}".format(key, | |
pre_value, | |
post_value)) | |
if pre_value and post_value: | |
finalized_dict[key] = True | |
else: | |
finalized_dict[key] = False | |
return finalized_dict | |
def generate_successful_email_util(start_dt, reload_end_dt, **kwargs): | |
logger.info("Starting to generate success email") | |
# Return immediately if email is disabled | |
if not import_airflow_enable_notifications: | |
logger.info("Skipping success email because airflow notifications are disabled") | |
return | |
me = import_ep_pipeline_success_email_from | |
you = import_ep_pipeline_success_email_dl | |
# Create message container - the correct MIME type is multipart/alternative. | |
msg = MIMEMultipart('alternative') | |
msg['Subject'] = "Airflow EP Data Load Complete [{}]".format(ENV) | |
msg['From'] = me | |
msg['To'] = you | |
if 'reload_data' in kwargs: | |
logger.info("Email notification being generated for reload data script") | |
from_dag = False | |
data_path = 'Reload Data Script' | |
url = '' | |
rld_any_dlq_messages = kwargs['any_dlq_messages'] | |
rld_all_data_in_db = kwargs['all_data_in_db'] | |
# Craft a status message for reload_data | |
if not rld_any_dlq_messages and rld_all_data_in_db: | |
rld_status_message = "Success" | |
else: | |
failure_reasons = [] | |
if not rld_all_data_in_db: | |
failure_reasons.append("Some Dates Have No Data!") | |
if rld_any_dlq_messages: | |
failure_reasons.append("DLQ Messages Found") | |
rld_status_message = "Failure : {}".format(failure_reasons) | |
logger.info("Success email being generated for Reload Data Script") | |
else: | |
logger.info("Email notification being generated for airflow run") | |
from_dag = True | |
data_path = 'Airflow' | |
base_url = kwargs['conf'].get('webserver', 'base_url') | |
dag_id = kwargs['dag'].dag_id | |
url = "<a href='{}/admin/airflow/tree?num_runs=25&root=&dag_id={}'>Airflow</a>".format(base_url, dag_id) | |
logger.info("Success email being generated for Airflow run") | |
# Create the body of the message (a plain-text and an HTML version). | |
text = "Hi EP Folks!\nThe EP Data Pipeline ({}) Loaded Data in the {} environment for ".format(data_path, ENV) | |
if (reload_end_dt): | |
text += "period: {} to {} GMT/UTC. ".format(start_dt, reload_end_dt) | |
text += "\n The run status is {}".format(rld_status_message) | |
text += "\n The run included options: {}".format(kwargs['reload_opts']) | |
else: | |
text += "day: {} GMT/UTC".format(start_dt) | |
# Build an html table for rda and message counts for the past N days in the DB | |
histo_end_date = start_dt if reload_end_dt == None else reload_end_dt | |
histos_dict = get_record_date_histos_for_tables(start_dt, histo_end_date, reload_data=(not from_dag)) | |
html_table = """\ | |
<table border='1' style='width:100%'> | |
<tr><th>Day</th><th>RDAs</th><th>Messages</th></tr> | |
""" | |
for msg_row, rda_row in zip(histos_dict['MESSAGE_HISTO'], histos_dict['RDA_HISTO']): | |
html_table +="<tr><td>{}</td><td>{}</td><td>{}</td></tr>".format(rda_row[0], rda_row[1], msg_row[1]) | |
html_table += "</p></table>" | |
# Build a list of DLQ Exceptions | |
dlq_q_size, dlq_msg_exception_s3_url_list = collects_DLQ_stats_simple() | |
# Report via Slack | |
report_issues_on_slack_for_DLQs(dlq_q_size, dlq_msg_exception_s3_url_list, **kwargs) | |
html_DLQ_exception_list = '' | |
if len(dlq_msg_exception_s3_url_list) > 0: | |
html_DLQ_exception_list = """ | |
<p><strong>Some DLQ Exceptions (total DQL Messages = {})</strong><br></p> | |
</p> | |
""".format(dlq_q_size) | |
i = 1 | |
for dlq_exception_url in dlq_msg_exception_s3_url_list: | |
link = """<p><a href="{}/">DLQ Exceptions</a></p>""".format(dlq_exception_url) | |
i = i+1 | |
html_DLQ_exception_list += link | |
# Build an html table of message_stats_by_stage | |
orgs_ingest_lookup = get_org_ingest_dict() | |
html_table_message_stats_by_stage = '' | |
if from_dag: | |
html_table_message_stats_by_stage = 'From Airflow start_date = {}'.format(start_dt) | |
message_stats_by_stage = validate_stages(**kwargs) | |
# Insert discrepancy metrics in DB! | |
insert_discrepancy_metrics_in_db(start_dt, message_stats_by_stage, dlq_q_size, orgs_ingest_lookup, **kwargs) | |
html_table_message_stats_by_stage = """\ | |
<p><strong>Message Counts by Org and by Pipeline Stage Output</strong><br></p> | |
<p> | |
<table border="1" style="width:100%"> | |
<tr><th>Org_ID</th><th>Collector Msgs</th><th>Agg Msgs</th><th>DB Msgs</th><th>Discrepancy</th><th>Discr %</th><th>NRT DB Msgs</th><th>NRT Discrepancy</th><th>NRT Discr %</th><th>Skipped</th><th>Notes</th></tr> | |
""" | |
for org_id, message_counts in message_stats_by_stage.iteritems(): | |
pipeline_stats = compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup) | |
html_table_message_stats_by_stage += """\ | |
<tr> | |
<td>{org_id}</td> | |
<td>{collector_text}</td> | |
<td>{agg_count}</td> | |
<td>{db_message_count}</td> | |
<td>{discrepancy_count}</td> | |
<td>{discrepancy_pct}</td> | |
<td>{db_nrt_message_count}</td> | |
<td>{nrt_discrepancy_count}</td> | |
<td>{nrt_discrepancy_pct}</td> | |
<td>{skipped_count}</td> | |
<td>{notes}</td> | |
</tr>""".format(**pipeline_stats) | |
html_table_message_stats_by_stage += "</table></p>" | |
#Final template | |
html = """\ | |
<html> | |
<head> | |
</head> | |
<body bgcolor="#e3ecfc"> | |
<p> | |
{} | |
<br> | |
{} | |
</p> | |
{} | |
{} | |
<br> | |
<p> | |
{} | |
</p> | |
<br> | |
<img src="{}" width="100" height="100"> | |
</body> | |
</html> | |
""".format(text, | |
url, | |
html_table, | |
html_table_message_stats_by_stage, | |
html_DLQ_exception_list, | |
random.choice(_EYE_CANDY_LIST)) | |
# Record the MIME types of both parts - text/plain and text/html. | |
part1 = MIMEText(html, 'html') | |
part2 = MIMEText(text, 'plain') | |
# Attach parts into message container. | |
# According to RFC 2046, the last part of a multipart message, in this case | |
# the HTML message, is best and preferred. | |
msg.attach(part1) | |
msg.attach(part2) | |
# Send the message via local SMTP server. | |
s = smtplib.SMTP(_MANDRILL_API_ENDPOINT) | |
# sendmail function takes 3 arguments: sender's address, recipient's address | |
# and message to send - here it is sent as one string. | |
s.login(import_ep_pipeline_mandrill_creds_dict['username'], | |
import_ep_pipeline_mandrill_creds_dict['password']) | |
s.sendmail(me, you, msg.as_string()) | |
s.quit() | |
logger.info("Success email sent") | |
def compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup): | |
if message_counts[0] == -1: | |
collector_text = 'Error in collector avro!' | |
collector_count = 0 | |
else: | |
collector_count = collector_text = message_counts[0] | |
agg_count = message_counts[1] | |
db_message_count = message_counts[2] | |
db_nrt_message_count = message_counts[6] | |
skipped_count = message_counts[3] | |
discrepancy_count = compute_discr(collector_count - db_message_count - skipped_count, collector_count) | |
discrepancy_pct = perc(collector_count - db_message_count - skipped_count, collector_count) | |
nrt_discrepancy_count = compute_discr(collector_count - db_nrt_message_count - skipped_count, collector_count) | |
nrt_discrepancy_pct = perc(collector_count - db_nrt_message_count - skipped_count, collector_count) | |
notes = '' | |
if not orgs_ingest_lookup[org_id]: | |
notes += 'Sandboxed ({})'.format(discrepancy_count) | |
discrepancy_pct = 0.0 | |
discrepancy_count = 0 | |
return { | |
'org_id': org_id, | |
'collector_text': collector_text, | |
'collector_count': collector_count, | |
'agg_count': agg_count, | |
'db_message_count': db_message_count, | |
'db_nrt_message_count': db_nrt_message_count, | |
'skipped_count': skipped_count, | |
'discrepancy_count': discrepancy_count, | |
'discrepancy_pct': discrepancy_pct, | |
'nrt_discrepancy_count': nrt_discrepancy_count, | |
'nrt_discrepancy_pct': nrt_discrepancy_pct, | |
'notes': notes, | |
} | |
# Compute discrepancy | |
def compute_discr(disc, collector_count): return 'Undefined' if collector_count == 0 else (disc) | |
# Compute a percentage | |
def perc(part, whole): return 'Undefined' if whole == 0 else (100 * round(float(part)/float(whole), 3)) | |
def send_email_notification_flow_successful(ds, **kwargs): | |
start_dt, end_dt = get_start_and_end_datetime(**kwargs) | |
generate_successful_email_util(start_dt, None, **kwargs) | |
return | |
# This function downloads target collector files, parses them using an | |
# Avro reader, and then counts the number of records per org | |
def get_collector_file_message_counts_per_org(execution_date): | |
start_ts = execution_date.strftime(SPARK_DATE_FORMAT) | |
end_ts = compute_end_dt(execution_date, None).strftime(SPARK_DATE_FORMAT) | |
key_name = 'msg_counts/collector_msg_count_{}_{}.json'.format(start_ts, end_ts) | |
org_msg_count_dict = json.load(boto.connect_s3().get_bucket(import_airflow_importer_metadata_bucket_name).get_key(key_name)) | |
return org_msg_count_dict | |
def get_collector_file_message_counts_per_org_old(ingest_disabled_orgs, execution_date): | |
# Create a local temp dir | |
temp_local_dir = tempfile.mkdtemp() | |
print "----------- get_collector_file_message_counts_per_org: temp local dir = {} for execution date = {}, skipping {}".format(temp_local_dir, | |
execution_date, | |
ingest_disabled_orgs) | |
# Convert ds into the appropriate date format to be used for identifying collector files! | |
collector_date_string = execution_date.strftime(COLLECTOR_VALIDATE_DATE_FORMAT) | |
# Generate exclude conditions for each org we want to skip | |
ingest_disabled_exclusion_list = '' | |
for org_id in ingest_disabled_orgs: | |
ingest_disabled_exclusion_list += " --exclude '*uploads/{}/*'".format(org_id) | |
print "get_collector_file_message_counts_per_org: ingest_disabled_exclusion_list = {}".format(ingest_disabled_exclusion_list) | |
# Download the collector files | |
command = "aws s3 cp --recursive --exclude '*' --include '*{}*' {} --exclude '*.gz' s3://{}/ {} ".format(collector_date_string, | |
ingest_disabled_exclusion_list, | |
import_airflow_s3_collector_ingest_bucket_name, | |
temp_local_dir) | |
_call_subprocess_simple(command, tries=4, sleep_secs=5) | |
# Counts the message per org | |
org_msg_count_dict = dict() | |
for org_dir in os.listdir("%s/uploads" % (temp_local_dir)): | |
temp_file = tempfile.NamedTemporaryFile() | |
try: | |
command = "for f in `find {}/{} -type f`; do echo $f; avro cat $f >> {} ; done".format(temp_local_dir, | |
"uploads/%s"%(org_dir), | |
temp_file.name) | |
_call_subprocess_simple(command) | |
temp_file.seek(0) | |
msg_count = 0 | |
for line in temp_file: | |
msg_count = msg_count + 1 | |
print "msg count = {}".format(msg_count) | |
org_msg_count_dict[org_dir] = msg_count | |
finally: | |
# Automatically cleans up the file | |
temp_file.close() | |
# Clean up the temp local dir | |
shutil.rmtree(temp_local_dir) | |
# Print out the | |
pp.pprint(org_msg_count_dict) | |
# Returns a dict such as {'1': 52471, '11': 1, '2': 81, '5': 81, '6': 43383, '9': 81} | |
return org_msg_count_dict | |
# Helper method to handle subprocess failures | |
def _call_subprocess_simple(command, suppress_logging=False, allow_error=False, tries=2, sleep_secs=1): | |
if not suppress_logging: | |
logging_message = "Running : %s" % command | |
logger.info(logging_message) | |
# Attempt a "Try" number of times | |
attempts = 0 | |
ret_code = -9 | |
while attempts < tries and ret_code != 0: | |
logging_message = "%s Attempt at Running : %s" % (attempts, command) | |
ret_code = os.system(command) | |
attempts = attempts + 1 | |
time.sleep(sleep_secs) # Wait sleep_secs before retrying | |
# If we allow errors, continue | |
error_message = "FAILURE on %s Attempts at Running : %s" % (attempts, command) | |
if allow_error == True: | |
return ret_code | |
# Otherwise, log a critical error | |
if ret_code != 0: | |
logger.critical(error_message) | |
# Always return the ret_code | |
return ret_code | |
# Convenience method to get start and end datetimes | |
def get_start_and_end_datetime(**kwargs): | |
end_date = kwargs['execution_date'] + timedelta(seconds=int(import_airflow_granularity_secs)) | |
return (kwargs['execution_date'], end_date) | |
# Slack-ops : Report discrepancy graph on Slack | |
def report_issues_on_slack_for_DLQs(dlq_q_size, dlq_msg_exception_s3_url_list, **kwargs): | |
if dlq_q_size > 0: | |
print '------------> {}'.format(dlq_msg_exception_s3_url_list[0]) | |
DLQs_found_text = "{} - {} on {} completed `{}` with {} DLQs : <{} | Sample Exception> ".format(ENV, | |
kwargs["dag"].dag_id, | |
kwargs["ti"].hostname, | |
kwargs["execution_date"], | |
dlq_q_size, | |
'{}'.format(dlq_msg_exception_s3_url_list[0])) | |
report_issues_on_slack_util(DLQs_found_text) | |
# Slack-ops : Report discrepancy graph on Slack | |
def report_issues_on_slack_for_high_discrepancies(pipeline_stats, **kwargs): | |
base_url = kwargs['conf'].get('webserver', 'base_url') | |
collector_file_count_threshold = int(import_ep_pipeline_discrepancy_alerting_config['collector_file_count_threshold']) | |
discrepancy_percentage_alerting_threshold = int(import_ep_pipeline_discrepancy_alerting_config['discrepancy_percentage_alerting_threshold']) | |
discrepancy_chart_id = int(import_ep_pipeline_discrepancy_alerting_config['discrepancy_chart_id']) | |
url = '{}/admin/airflow/chart?chart_id={}'.format(base_url, discrepancy_chart_id) | |
if pipeline_stats['collector_count'] > collector_file_count_threshold and pipeline_stats['discrepancy_pct'] > discrepancy_percentage_alerting_threshold: | |
high_discrepany_text = "{} - {} on {} completed `{}` with <{} | High Discrepancies>".format(ENV, | |
kwargs["dag"].dag_id, | |
kwargs["ti"].hostname, | |
kwargs["execution_date"], | |
url) | |
report_issues_on_slack_util(high_discrepany_text) | |
# Slack-ops : Slack reporting - util function | |
def report_issues_on_slack_util(text): | |
if import_ep_ops_slack_alerting_enabled: | |
token = slack_api_token | |
sc = SlackClient(token) | |
sc.api_call("chat.postMessage", **{"channel":ep_ops_slack_channel_name, | |
"username":"Airflow", | |
"icon_url":'https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png', | |
"text":text}) | |
# Insert discrepancy metrics in the db | |
def insert_discrepancy_metrics_in_db(start_dt, message_stats_by_stage, dlq_q_size, orgs_ingest_lookup, **kwargs): | |
# Delete overlapping data | |
delete_db_discrepancy_data(import_ep_db_connect_string, start_dt) | |
# Initialize the DB Conn and cursor | |
db_conn = psycopg2.connect(import_ep_db_connect_string) | |
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string)) | |
cursor = db_conn.cursor() | |
INSERT_SQL = '''INSERT INTO telemetry_data_discrepancy (execution_date, organization_id, granularity_secs, collector_msgs, agg_msgs, db_msgs, db_nrt_msgs, discrepancy, discrepancy_percentage, agg_filtered_gm_spam_int, dlq_q_size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''' | |
# Given a dict[org_id ==> message_counts] | |
for org_id, message_counts in message_stats_by_stage.iteritems(): | |
pipeline_stats = compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup) | |
# Initialize key columns | |
granularity_sec = import_airflow_granularity_secs | |
# Current time | |
now = datetime.now() | |
# Pass data to fill a query placeholders and let Psycopg perform | |
# the correct conversion (no more SQL injections!) | |
logger.info("----- Executing the following query against the db : {}".format(INSERT_SQL)) | |
cursor.execute(INSERT_SQL, (start_dt, | |
pipeline_stats['org_id'], | |
granularity_sec, | |
pipeline_stats['collector_count'], | |
pipeline_stats['agg_count'], | |
pipeline_stats['db_message_count'], | |
pipeline_stats['db_nrt_message_count'], | |
pipeline_stats['discrepancy_count'], | |
pipeline_stats['discrepancy_pct'], | |
pipeline_stats['skipped_count'], | |
dlq_q_size, | |
now, | |
now)) | |
db_conn.commit() #close the transaction | |
# If discrepancies rise above a certain level, report them on slack for each org! | |
report_issues_on_slack_for_high_discrepancies(pipeline_stats, **kwargs) | |
# close the cursor and connection | |
cursor.close() | |
db_conn.close() | |
def spark_agg_retry(context): | |
#purge queue | |
logger.info("----- Preparing for aggregation retry") | |
purge_sqs_queue(import_sqs_queue_name, purge_if_empty=True) | |
start_dt = context['execution_date'] | |
end_dt = start_dt + timedelta(seconds=int(import_airflow_granularity_secs)) | |
# Delete data | |
delete_db_data_util(import_ep_db_connect_string, start_dt, end_dt) | |
def purge_sqs_queue(queue_name, purge_if_empty=False): | |
conn = boto.sqs.connect_to_region(region) | |
q = conn.get_queue(queue_name) | |
q_size = q.count() | |
logger.info("----- Queue {} has {} messages".format(queue_name, q_size)) | |
if purge_if_empty or q_size > 0: | |
logger.info("----- About to purge {}".format(queue_name)) | |
was_successful = q.purge() | |
logger.info("----- Purged {} and received a return status of {}".format(queue_name, | |
was_successful)) | |
else: | |
was_successful = True | |
return was_successful | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment