Last active
June 2, 2016 20:23
-
-
Save georgf/6e579728fc0b2e678c2034f16e83a063 to your computer and use it in GitHub Desktop.
backfill_fennec_dashboard_weekly
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[1]: | |
import ujson as json | |
import datetime as dt | |
import os.path | |
import boto3 #S3 | |
import botocore | |
import calendar | |
from os import listdir | |
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client | |
from pprint import pprint | |
get_ipython().magic(u'pylab inline') | |
# Let's pick the report we want to generate here. | |
# In[2]: | |
def snap_to_past_sunday(date): | |
""" Get the closest, previous Sunday since date. """ | |
# We need the weeks starting from "Sunday", not from "Monday", | |
# so account for that using |(today.weekday() + 1) % 7)| | |
return date - datetime.timedelta(days=((date.weekday() + 1) % 7)) | |
def snap_to_beginning_of_month(date): | |
""" Get the date for the first day of this month. """ | |
return date.replace(day=1) | |
def get_last_week_range(): | |
today = dt.date.today() | |
# Get the first day of the past complete week. | |
start_of_week = snap_to_past_sunday(today) - datetime.timedelta(weeks=1) | |
end_of_week = start_of_week + datetime.timedelta(days=6) | |
return (start_of_week, end_of_week) | |
def get_last_month_range(): | |
today = dt.date.today() | |
# Get the last day for the previous month. | |
end_of_last_month = snap_to_beginning_of_month(today) - datetime.timedelta(days=1) | |
start_of_last_month = snap_to_beginning_of_month(end_of_last_month) | |
return (start_of_last_month, end_of_last_month) | |
# ### Fetch the core pings | |
# First thing, pick a submission range. Either last week or last month. | |
# In[3]: | |
def fetch_deduped_pings(sub_range): | |
core_pings = get_pings(sc, | |
app="Fennec", | |
doc_type="core", | |
source_version="*", | |
submission_date=sub_range, | |
fraction=fraction) | |
# We don't need the whole ping. Just get the props we want. | |
subset = get_pings_properties(core_pings, ["clientId", | |
"osversion", | |
"os", | |
"profileDate", | |
"meta/submissionDate", | |
"meta/geoCountry", | |
"meta/appUpdateChannel", | |
"meta/Timestamp", | |
"meta/documentId" | |
]) | |
# iOS also submits "core" pings, filter for Android only. | |
android = subset.filter(lambda p: p.get("os", "") == "Android") | |
# We can (sadly) have duplicated pings. Apply deduping. | |
return android.map(lambda p: (p["meta/documentId"], p)) .reduceByKey(lambda a, b: a) .map(lambda t: t[1]) | |
# ### Aggregate the data | |
# | |
# Fields documentation: | |
# * *os_version* - core pings contain the API level, we need to output the codename+version | |
# * *geo* - the country code from the country originating the pings. We're only interested in some countries, other countries are grouped as "Other" | |
# * *channel* - the product channel | |
# * *date* - the first day of the aggregated week/month | |
# * *actives* - the number of clients that were active that day. It checked 'org.mozilla.appSessions' before, can we simply count the number of core pings submitted on that day (dedupe by client id!)? | |
# * *new_records* - profile creation date == submission date | |
# * This could not hold due to broken clocks, temporary loss of network, ... | |
# * *d1* - how many clients were active at least once the day after the profile creation date? | |
# * *d7* - how many clients were active at least once on the 7th day following profile creation? | |
# * *d30* - how many clients were active at least once on the 30th day following profile creation? | |
# * *hours* - session duration in hours. Currently 0. | |
# * *google, yahoo, bing, other* - Currently 0 | |
# | |
# The d1/d7/d30 metrics stricly imply that the user was seen on the day, not the days before (e.g. d7 means the user was seen on profile creation date + 7, not within the [profile creation date, profile creation date + 7] window). | |
# In[4]: | |
COUNTRIES_OF_INTEREST = set(['US','CA','BR','MX','FR','ES','IT','PL','TR','RU','DE','IN','ID','CN','JP','GB']) | |
def get_country(original_country): | |
return original_country if original_country in COUNTRIES_OF_INTEREST else 'Other' | |
def android_api_level_to_version(api_level): | |
""" | |
The core ping stores the API level, but we need to display the OS version/codename. | |
We can map API Level -> Codename, the related information is available there: | |
https://source.android.com/source/build-numbers.html | |
""" | |
API_MAP = { | |
'8': 'Froyo (2.2 - 2.2.3)', | |
'9': 'Gingerbread (2.3 - 2.3.7)', | |
'10': 'Gingerbread (2.3 - 2.3.7)', | |
'11': 'Honeycomb (3.0 - 3.2.6)', | |
'12': 'Honeycomb (3.0 - 3.2.6)', | |
'13': 'Honeycomb (3.0 - 3.2.6)', | |
'14': 'Ice Cream Sandwich (4.0 - 4.0.4)', | |
'15': 'Ice Cream Sandwich (4.0 - 4.0.4)', | |
'16': 'Jelly Bean (4.1 - 4.3.x)', | |
'17': 'Jelly Bean (4.1 - 4.3.x)', | |
'18': 'Jelly Bean (4.1 - 4.3.x)', | |
'19': 'KitKat (4.4 - 4.4.4)', | |
'21': 'Lollipop (5.0 - 5.1)', | |
'22': 'Lollipop (5.0 - 5.1)', | |
'23': 'Marshmallow (6.0)', | |
} | |
return API_MAP.get(api_level, 'Other') | |
# In[5]: | |
def parse_to_unix_days(s): | |
""" Converts YYYYMMDD to days since unix epoch """ | |
return (dt.datetime.strptime(s, "%Y%m%d") - dt.datetime(1970,1,1)).days | |
def get_file_name(report_type, suffix=""): | |
return "fennec-v4-" + report_type + suffix + ".csv" | |
def safe_increment(p, key): | |
""" Safely increments p[key]. """ | |
p[key] = p.get(key, 0) + 1 | |
def is_new_profile(submission_epoch, profile_epoch): | |
""" | |
Determines if this is a new profile by checking if the submission date | |
equals the profile creation date. | |
""" | |
return submission_epoch == profile_epoch | |
def get_key(p, segments, report_type): | |
""" Build a key-tuple with the dimensions we want to aggregate. """ | |
dims = [] | |
# Translate the API version to a name. | |
if 'osversion' in segments: | |
dims.append(android_api_level_to_version(p.get('osversion'))) | |
else: | |
dims.append('all') | |
# Only get some of the countries. | |
if 'meta/geoCountry' in segments: | |
dims.append(get_country(p.get('meta/geoCountry'))) | |
else: | |
dims.append('all') | |
# Only get some of the countries. | |
if 'meta/appUpdateChannel' in segments: | |
dims.append(p.get('meta/appUpdateChannel')) | |
else: | |
dims.append('all') | |
# Append the date, at last. In the weekly mode, that's the first day of the | |
# submission week. In the monthly mode, that's the first day of the month. | |
submission_date = dt.datetime.strptime(p.get("meta/submissionDate"), "%Y%m%d") | |
date_string = "" | |
if report_type == "monthly": | |
date_string = snap_to_beginning_of_month(submission_date).strftime("%Y%m01") | |
else: | |
date_string = snap_to_past_sunday(submission_date).strftime("%Y%m%d") | |
dims.append(date_string) | |
return tuple(dims) | |
def run_query(pings, segments, report_type): | |
""" | |
Aggregate the pings over the dimensions in "segments". We start by generating a key for each | |
ping by chaining the values of the dimensions of interest. If we don't care about a particular | |
dimension, its value is set to "all". | |
All the pings belonging to a key are aggregated together. | |
""" | |
# Segment the data by indexing them by dimensions. | |
segmented_pings = pings.map(lambda p: (get_key(p, segments, report_type), p)) | |
# We require the profile age to measure the retention. Filter out those pings that don't have it. | |
filtered = segmented_pings.filter(lambda p: p[1].get("profileDate", None) != None) | |
# print "Got {} pings after filtering invalid profileDate(s)".format(filtered.count()) | |
# For metrics like d1, d7, d30, and new_records we need only one core ping per client, per day. | |
# Generate a new RDD containing only one ping per client, for each day, within the segment: | |
# Step 1 - Append the client id and submission date to the index key | |
# Step 2 - ReduceByKey so that we get only one ping per day per client | |
# Step 3 - Strip off the client id/submission date from the index key | |
one_per_day = filtered.map(lambda p: ((p[0], p[1].get("clientId"), p[1].get("meta/submissionDate")), p[1])) .reduceByKey(lambda a, b: a) .map(lambda p: (p[0][0], p[1])) | |
# Compute the aggregated counts. | |
def retention_seq(acc, v): | |
if not acc: | |
acc = {} | |
# **** IMPORTANT NOTICE **** | |
# | |
# Please note that retention *WILL* be broken by broken client clocks. This is most | |
# certainly affected by clock skew. Once we'll have clock skew data from the clients | |
# we might consider revisiting this to adjust the profileDate accordingly. | |
# Another option would be to fetch, in some way, the first ping ever submitted by the client, | |
# but that would not be practical due to the data retention policies (luckily). | |
submission_epoch = parse_to_unix_days(v['meta/submissionDate']) | |
# Check if this ping is on a new profile. If so, increments "new_records". | |
if is_new_profile(submission_epoch, v['profileDate']): | |
safe_increment(acc, 'new_records') | |
# Evaluate the d1, d7, d30 retention metrics. First, get the delta between | |
# the submission date and the profile creation date. | |
days_after_creation = submission_epoch - v['profileDate'] | |
# Is the user still engaged after 1 day (d1)? | |
if days_after_creation == 1: | |
safe_increment(acc, 'd1') | |
# And after 7 days (d7)? | |
elif days_after_creation == 7: | |
safe_increment(acc, 'd7') | |
# And after 30 days (d30)? | |
elif days_after_creation == 30: | |
safe_increment(acc, 'd30') | |
return acc | |
def cmb(v1, v2): | |
# Combine the counts from the two partial dictionaries. Hacky? | |
return { k: v1.get(k, 0) + v2.get(k, 0) for k in set(v1) | set(v2) } | |
retention_defaults = { | |
'new_records': 0, | |
'actives': 0, | |
'd1': 0, | |
'd7': 0, | |
'd30': 0, | |
} | |
aggregated_retention = one_per_day.aggregateByKey(retention_defaults, retention_seq, cmb) | |
# For each segment, count how many active clients: | |
def count_actives(acc, v): | |
acc["actives"] = acc["actives"] + 1 | |
return acc | |
# We aggregate the active user count in an object to ease joining | |
actives_per_segment = segmented_pings.map(lambda r: ((r[0], r[1].get("clientId")), 1)) .reduceByKey(lambda x,y: x) .map(lambda r: (r[0][0], 1)) .aggregateByKey({"actives":0}, count_actives, cmb) | |
# Join the RDDs. | |
merged = aggregated_retention.join(actives_per_segment).mapValues(lambda r: cmb(r[0], r[1])) | |
return merged | |
# To build a single CSV file, we execute a series of queries and then serialize the output. | |
# In[6]: | |
def run_queries(report_type, start_date=None, end_date=None): | |
""" | |
This function has 3 operating modes: | |
1. "weekly", which aggregates the data from the last full week and appends the results to the weekly CSV; | |
2. "monthly", aggregates the last full month and appends the results to the monthly CSV; | |
3. "backfill", given a start and end date, performs weekly or monthly aggregation over that period and | |
appends to the CSV file. | |
""" | |
# Each entry represents a different query over a set of dimensions of interest. | |
QUERIES = [ | |
['osversion', 'meta/geoCountry', 'meta/appUpdateChannel'], | |
['osversion', 'meta/appUpdateChannel'], | |
['osversion', 'meta/geoCountry'], | |
['meta/geoCountry', 'meta/appUpdateChannel'], | |
['osversion'], | |
['meta/geoCountry'], | |
['meta/appUpdateChannel'], | |
[] | |
] | |
# Check start_date and end_date for validity. If invalid, set them for last week/month | |
date_range = get_last_month_range() if report_type is "monthly" else get_last_week_range() | |
if start_date != None and end_date != None: | |
sd = snap_to_past_sunday(start_date) if report_type is "weekly" else snap_to_beginning_of_month(start_date) | |
date_range = (sd, end_date) | |
delta = date_range[1] - date_range[0] | |
print "Running summary analysis type {} over {} days for {} to {}" .format(report_type, delta.days, *date_range) | |
# Split the submission period in chunks, so we don't run out of resources while aggregating. | |
date_chunks = [] | |
chunk_start = date_range[0] | |
print "Breaking date range into chunks:" | |
while chunk_start < date_range[1]: | |
# Compute the end of this time chunk. | |
chunk_end = None | |
if report_type == "monthly": | |
chunk_end = chunk_start.replace(day=calendar.monthrange(chunk_start.year, chunk_start.month)[1]) | |
else: | |
chunk_end = chunk_start + dt.timedelta(days=6) | |
print "* {} to {}".format(chunk_start.strftime("%Y%m%d"), chunk_end.strftime("%Y%m%d")) | |
date_chunks.append((chunk_start, chunk_end)) | |
# Move on to the next chunk, just add one day to either last month or week. | |
chunk_start = chunk_end + dt.timedelta(days=1) | |
# The results will be in a dict of the form {(os, geo, channel, date): dict, ...}. | |
results = {} | |
for chunk_start,chunk_end in date_chunks: | |
# Fetch the pings we need. | |
submissions_range = (chunk_start.strftime("%Y%m%d"), chunk_end.strftime("%Y%m%d")) | |
print "\nFetching pings for {} to {}".format(*submissions_range) | |
deduped = fetch_deduped_pings(submissions_range) | |
#print "Fetched {} pings".format(deduped.count()) | |
chunk_results = {} | |
for query in QUERIES: | |
print " * Running query over dimensions: %s" % ", ".join(query) | |
query_result = dict(run_query(deduped, query, report_type).collect()) | |
#pprint(query_result) | |
# Append this RDD to the results for this chunk. | |
chunk_results.update(query_result) | |
# Serialize intermediate results to file, so we don't start from scratch if the batch fails. | |
# We append the week/month at the end of the file name. | |
serialize_results(chunk_results, report_type, submissions_range[0]) | |
# Append this chunk results to the whole RDD. We assume the keys DO NOT collide. | |
results.update(chunk_results) | |
return results | |
# ### CSV and S3 utility functions. | |
# Some utility functions to read/write from the S3 store. | |
# In[7]: | |
S3_ANALYSIS_BUCKET = "net-mozaws-prod-us-west-2-pipeline-analysis" | |
S3_ANALYSIS_BASE_PATH = "aplacitelli/" | |
S3_DASHBOARD_BUCKET = "net-mozaws-prod-metrics-data" | |
def fetch_previous_state(report_type): | |
""" | |
To prevent ACL issues and prevent files from disappearing from the dashboard bucket, | |
we fetch and stage the canonical state from the analysis bucket, only sending the updated | |
state to the dashboard bucket. | |
""" | |
file_name = get_file_name(report_type) | |
# Fetch the CSV | |
client = boto3.client('s3', 'us-west-2') | |
transfer = boto3.s3.transfer.S3Transfer(client) | |
key_path = "{}{}".format(S3_ANALYSIS_BASE_PATH, 'fennec-dashboard/' + file_name) | |
try: | |
transfer.download_file(S3_ANALYSIS_BUCKET, key_path, file_name) | |
except botocore.exceptions.ClientError as e: | |
# If the file wasn't there, that's ok. Otherwise, abort! | |
if e.response['Error']['Code'] != "404": | |
raise e | |
else: | |
print "Did not find an existing file at '{}'".format(key_path) | |
def store_new_state(report_type): | |
""" | |
To prevent ACL issues and prevent files from disappearing from the dashboard bucket, | |
we fetch and stage the canonical state from the analysis bucket, only sending the updated | |
state to the dashboard bucket. | |
""" | |
file_name = get_file_name(report_type) | |
client = boto3.client('s3', 'us-west-2') | |
transfer = boto3.s3.transfer.S3Transfer(client) | |
# Update the state in the analysis bucket. | |
analysis_key_path = "{}{}".format(S3_ANALYSIS_BASE_PATH, 'fennec-dashboard/' + file_name) | |
transfer.upload_file(file_name, S3_ANALYSIS_BUCKET, analysis_key_path) | |
# Update the state in the analysis bucket. | |
transfer.upload_file(file_name, S3_DASHBOARD_BUCKET, 'fennec-dashboard/' + file_name, | |
extra_args={'ACL': 'bucket-owner-full-control'}) | |
# Utility functions to map our data to CSV and then save it to file. | |
# In[8]: | |
def to_csv(r): | |
# The key itself is a tuple containing the following data: | |
# (os, geo, channel, date) | |
data_from_key = r[0] | |
formatted_date = dt.datetime.strptime(data_from_key[3], "%Y%m%d").strftime("%Y-%m-%d") | |
return ",".join([ | |
data_from_key[0], # os | |
data_from_key[1], # geo | |
data_from_key[2], # channel | |
formatted_date, | |
str(r[1]['actives']), | |
str(r[1]['new_records']), | |
str(r[1]['d1']), | |
str(r[1]['d7']), | |
str(r[1]['d30']), | |
"0", #str(r[1]['hours']), | |
"0", #str(r[1]['google']), | |
"0", #str(r[1]['yahoo']), | |
"0", #str(r[1]['bing']), | |
"0", #str(r[1]['other']), | |
]) | |
def serialize_results(results, report_type, file_suffix=""): | |
file_name = get_file_name(report_type, file_suffix) | |
skip_csv_header = False | |
# If the file is already there, append the new data, but don't print the header again. | |
if os.path.exists(file_name): | |
print("Omitting the CSV header") | |
skip_csv_header = True | |
csv_lines = map(to_csv, results.iteritems()) | |
print("Writing %i new entries" % len(csv_lines)) | |
with open(file_name, "a") as csv_file: | |
# The file didn't exist before this call, print the header. | |
if not skip_csv_header: | |
header = "os_version,geo,channel,date,actives,new_records,d1,d7,d30,hours,google,yahoo,bing,other" | |
csv_file.write(header.encode('utf8') + "\n") | |
# Finally append the data lines. | |
for r in csv_lines: | |
#print " ... writing csv line: ", r | |
csv_file.write(r.encode('utf8') + "\n") | |
# ### Execute our script | |
# In[9]: | |
def get_mode_from_filename(): | |
""" | |
This is hackery-hacky, until bug 1258685 lands: if there is a file named 'summarize_csv_monthly.ipynb', | |
we select the monthly operating mode. Otherwise, we stay with weekly. | |
""" | |
return "monthly" if os.path.exists('summarize_csv_monthly.ipynb') else "weekly" | |
#operating_mode = get_mode_from_filename() # either "weekly" or "monthly" | |
operating_mode = "weekly" | |
fraction = 1.0 | |
if operating_mode not in ["weekly", "monthly"]: | |
raise ValueError("Unknown operating mode: %s " % operating_mode) | |
#start_date = None | |
#end_date = None | |
start_date = dt.datetime(2016, 5, 8) # Only use this when backfilling, e.g. dt.datetime(2016,3,6) | |
end_date = dt.datetime(2016, 5, 26) # Only use this when backfilling, e.g. dt.datetime(2016,3,19) | |
# Run the query and compute the results. | |
print "... Running queries" | |
result = run_queries(operating_mode, start_date, end_date) | |
# Fetch the previous CSV file from S3. | |
print "... Fetching previous state" | |
fetch_previous_state(operating_mode) | |
# Updates it. | |
print "... Serializing results" | |
serialize_results(result, operating_mode) | |
# Stores the updated one back to S3 | |
print "... Storing new state" | |
store_new_state(operating_mode) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment