Skip to content

Instantly share code, notes, and snippets.

@georgf
Last active June 2, 2016 20:23
Show Gist options
  • Save georgf/6e579728fc0b2e678c2034f16e83a063 to your computer and use it in GitHub Desktop.
Save georgf/6e579728fc0b2e678c2034f16e83a063 to your computer and use it in GitHub Desktop.
backfill_fennec_dashboard_weekly
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# In[1]:
import ujson as json
import datetime as dt
import os.path
import boto3 #S3
import botocore
import calendar
from os import listdir
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client
from pprint import pprint
get_ipython().magic(u'pylab inline')
# Let's pick the report we want to generate here.
# In[2]:
def snap_to_past_sunday(date):
""" Get the closest, previous Sunday since date. """
# We need the weeks starting from "Sunday", not from "Monday",
# so account for that using |(today.weekday() + 1) % 7)|
return date - datetime.timedelta(days=((date.weekday() + 1) % 7))
def snap_to_beginning_of_month(date):
""" Get the date for the first day of this month. """
return date.replace(day=1)
def get_last_week_range():
today = dt.date.today()
# Get the first day of the past complete week.
start_of_week = snap_to_past_sunday(today) - datetime.timedelta(weeks=1)
end_of_week = start_of_week + datetime.timedelta(days=6)
return (start_of_week, end_of_week)
def get_last_month_range():
today = dt.date.today()
# Get the last day for the previous month.
end_of_last_month = snap_to_beginning_of_month(today) - datetime.timedelta(days=1)
start_of_last_month = snap_to_beginning_of_month(end_of_last_month)
return (start_of_last_month, end_of_last_month)
# ### Fetch the core pings
# First thing, pick a submission range. Either last week or last month.
# In[3]:
def fetch_deduped_pings(sub_range):
core_pings = get_pings(sc,
app="Fennec",
doc_type="core",
source_version="*",
submission_date=sub_range,
fraction=fraction)
# We don't need the whole ping. Just get the props we want.
subset = get_pings_properties(core_pings, ["clientId",
"osversion",
"os",
"profileDate",
"meta/submissionDate",
"meta/geoCountry",
"meta/appUpdateChannel",
"meta/Timestamp",
"meta/documentId"
])
# iOS also submits "core" pings, filter for Android only.
android = subset.filter(lambda p: p.get("os", "") == "Android")
# We can (sadly) have duplicated pings. Apply deduping.
return android.map(lambda p: (p["meta/documentId"], p)) .reduceByKey(lambda a, b: a) .map(lambda t: t[1])
# ### Aggregate the data
#
# Fields documentation:
# * *os_version* - core pings contain the API level, we need to output the codename+version
# * *geo* - the country code from the country originating the pings. We're only interested in some countries, other countries are grouped as "Other"
# * *channel* - the product channel
# * *date* - the first day of the aggregated week/month
# * *actives* - the number of clients that were active that day. It checked 'org.mozilla.appSessions' before, can we simply count the number of core pings submitted on that day (dedupe by client id!)?
# * *new_records* - profile creation date == submission date
# * This could not hold due to broken clocks, temporary loss of network, ...
# * *d1* - how many clients were active at least once the day after the profile creation date?
# * *d7* - how many clients were active at least once on the 7th day following profile creation?
# * *d30* - how many clients were active at least once on the 30th day following profile creation?
# * *hours* - session duration in hours. Currently 0.
# * *google, yahoo, bing, other* - Currently 0
#
# The d1/d7/d30 metrics stricly imply that the user was seen on the day, not the days before (e.g. d7 means the user was seen on profile creation date + 7, not within the [profile creation date, profile creation date + 7] window).
# In[4]:
COUNTRIES_OF_INTEREST = set(['US','CA','BR','MX','FR','ES','IT','PL','TR','RU','DE','IN','ID','CN','JP','GB'])
def get_country(original_country):
return original_country if original_country in COUNTRIES_OF_INTEREST else 'Other'
def android_api_level_to_version(api_level):
"""
The core ping stores the API level, but we need to display the OS version/codename.
We can map API Level -> Codename, the related information is available there:
https://source.android.com/source/build-numbers.html
"""
API_MAP = {
'8': 'Froyo (2.2 - 2.2.3)',
'9': 'Gingerbread (2.3 - 2.3.7)',
'10': 'Gingerbread (2.3 - 2.3.7)',
'11': 'Honeycomb (3.0 - 3.2.6)',
'12': 'Honeycomb (3.0 - 3.2.6)',
'13': 'Honeycomb (3.0 - 3.2.6)',
'14': 'Ice Cream Sandwich (4.0 - 4.0.4)',
'15': 'Ice Cream Sandwich (4.0 - 4.0.4)',
'16': 'Jelly Bean (4.1 - 4.3.x)',
'17': 'Jelly Bean (4.1 - 4.3.x)',
'18': 'Jelly Bean (4.1 - 4.3.x)',
'19': 'KitKat (4.4 - 4.4.4)',
'21': 'Lollipop (5.0 - 5.1)',
'22': 'Lollipop (5.0 - 5.1)',
'23': 'Marshmallow (6.0)',
}
return API_MAP.get(api_level, 'Other')
# In[5]:
def parse_to_unix_days(s):
""" Converts YYYYMMDD to days since unix epoch """
return (dt.datetime.strptime(s, "%Y%m%d") - dt.datetime(1970,1,1)).days
def get_file_name(report_type, suffix=""):
return "fennec-v4-" + report_type + suffix + ".csv"
def safe_increment(p, key):
""" Safely increments p[key]. """
p[key] = p.get(key, 0) + 1
def is_new_profile(submission_epoch, profile_epoch):
"""
Determines if this is a new profile by checking if the submission date
equals the profile creation date.
"""
return submission_epoch == profile_epoch
def get_key(p, segments, report_type):
""" Build a key-tuple with the dimensions we want to aggregate. """
dims = []
# Translate the API version to a name.
if 'osversion' in segments:
dims.append(android_api_level_to_version(p.get('osversion')))
else:
dims.append('all')
# Only get some of the countries.
if 'meta/geoCountry' in segments:
dims.append(get_country(p.get('meta/geoCountry')))
else:
dims.append('all')
# Only get some of the countries.
if 'meta/appUpdateChannel' in segments:
dims.append(p.get('meta/appUpdateChannel'))
else:
dims.append('all')
# Append the date, at last. In the weekly mode, that's the first day of the
# submission week. In the monthly mode, that's the first day of the month.
submission_date = dt.datetime.strptime(p.get("meta/submissionDate"), "%Y%m%d")
date_string = ""
if report_type == "monthly":
date_string = snap_to_beginning_of_month(submission_date).strftime("%Y%m01")
else:
date_string = snap_to_past_sunday(submission_date).strftime("%Y%m%d")
dims.append(date_string)
return tuple(dims)
def run_query(pings, segments, report_type):
"""
Aggregate the pings over the dimensions in "segments". We start by generating a key for each
ping by chaining the values of the dimensions of interest. If we don't care about a particular
dimension, its value is set to "all".
All the pings belonging to a key are aggregated together.
"""
# Segment the data by indexing them by dimensions.
segmented_pings = pings.map(lambda p: (get_key(p, segments, report_type), p))
# We require the profile age to measure the retention. Filter out those pings that don't have it.
filtered = segmented_pings.filter(lambda p: p[1].get("profileDate", None) != None)
# print "Got {} pings after filtering invalid profileDate(s)".format(filtered.count())
# For metrics like d1, d7, d30, and new_records we need only one core ping per client, per day.
# Generate a new RDD containing only one ping per client, for each day, within the segment:
# Step 1 - Append the client id and submission date to the index key
# Step 2 - ReduceByKey so that we get only one ping per day per client
# Step 3 - Strip off the client id/submission date from the index key
one_per_day = filtered.map(lambda p: ((p[0], p[1].get("clientId"), p[1].get("meta/submissionDate")), p[1])) .reduceByKey(lambda a, b: a) .map(lambda p: (p[0][0], p[1]))
# Compute the aggregated counts.
def retention_seq(acc, v):
if not acc:
acc = {}
# **** IMPORTANT NOTICE ****
#
# Please note that retention *WILL* be broken by broken client clocks. This is most
# certainly affected by clock skew. Once we'll have clock skew data from the clients
# we might consider revisiting this to adjust the profileDate accordingly.
# Another option would be to fetch, in some way, the first ping ever submitted by the client,
# but that would not be practical due to the data retention policies (luckily).
submission_epoch = parse_to_unix_days(v['meta/submissionDate'])
# Check if this ping is on a new profile. If so, increments "new_records".
if is_new_profile(submission_epoch, v['profileDate']):
safe_increment(acc, 'new_records')
# Evaluate the d1, d7, d30 retention metrics. First, get the delta between
# the submission date and the profile creation date.
days_after_creation = submission_epoch - v['profileDate']
# Is the user still engaged after 1 day (d1)?
if days_after_creation == 1:
safe_increment(acc, 'd1')
# And after 7 days (d7)?
elif days_after_creation == 7:
safe_increment(acc, 'd7')
# And after 30 days (d30)?
elif days_after_creation == 30:
safe_increment(acc, 'd30')
return acc
def cmb(v1, v2):
# Combine the counts from the two partial dictionaries. Hacky?
return { k: v1.get(k, 0) + v2.get(k, 0) for k in set(v1) | set(v2) }
retention_defaults = {
'new_records': 0,
'actives': 0,
'd1': 0,
'd7': 0,
'd30': 0,
}
aggregated_retention = one_per_day.aggregateByKey(retention_defaults, retention_seq, cmb)
# For each segment, count how many active clients:
def count_actives(acc, v):
acc["actives"] = acc["actives"] + 1
return acc
# We aggregate the active user count in an object to ease joining
actives_per_segment = segmented_pings.map(lambda r: ((r[0], r[1].get("clientId")), 1)) .reduceByKey(lambda x,y: x) .map(lambda r: (r[0][0], 1)) .aggregateByKey({"actives":0}, count_actives, cmb)
# Join the RDDs.
merged = aggregated_retention.join(actives_per_segment).mapValues(lambda r: cmb(r[0], r[1]))
return merged
# To build a single CSV file, we execute a series of queries and then serialize the output.
# In[6]:
def run_queries(report_type, start_date=None, end_date=None):
"""
This function has 3 operating modes:
1. "weekly", which aggregates the data from the last full week and appends the results to the weekly CSV;
2. "monthly", aggregates the last full month and appends the results to the monthly CSV;
3. "backfill", given a start and end date, performs weekly or monthly aggregation over that period and
appends to the CSV file.
"""
# Each entry represents a different query over a set of dimensions of interest.
QUERIES = [
['osversion', 'meta/geoCountry', 'meta/appUpdateChannel'],
['osversion', 'meta/appUpdateChannel'],
['osversion', 'meta/geoCountry'],
['meta/geoCountry', 'meta/appUpdateChannel'],
['osversion'],
['meta/geoCountry'],
['meta/appUpdateChannel'],
[]
]
# Check start_date and end_date for validity. If invalid, set them for last week/month
date_range = get_last_month_range() if report_type is "monthly" else get_last_week_range()
if start_date != None and end_date != None:
sd = snap_to_past_sunday(start_date) if report_type is "weekly" else snap_to_beginning_of_month(start_date)
date_range = (sd, end_date)
delta = date_range[1] - date_range[0]
print "Running summary analysis type {} over {} days for {} to {}" .format(report_type, delta.days, *date_range)
# Split the submission period in chunks, so we don't run out of resources while aggregating.
date_chunks = []
chunk_start = date_range[0]
print "Breaking date range into chunks:"
while chunk_start < date_range[1]:
# Compute the end of this time chunk.
chunk_end = None
if report_type == "monthly":
chunk_end = chunk_start.replace(day=calendar.monthrange(chunk_start.year, chunk_start.month)[1])
else:
chunk_end = chunk_start + dt.timedelta(days=6)
print "* {} to {}".format(chunk_start.strftime("%Y%m%d"), chunk_end.strftime("%Y%m%d"))
date_chunks.append((chunk_start, chunk_end))
# Move on to the next chunk, just add one day to either last month or week.
chunk_start = chunk_end + dt.timedelta(days=1)
# The results will be in a dict of the form {(os, geo, channel, date): dict, ...}.
results = {}
for chunk_start,chunk_end in date_chunks:
# Fetch the pings we need.
submissions_range = (chunk_start.strftime("%Y%m%d"), chunk_end.strftime("%Y%m%d"))
print "\nFetching pings for {} to {}".format(*submissions_range)
deduped = fetch_deduped_pings(submissions_range)
#print "Fetched {} pings".format(deduped.count())
chunk_results = {}
for query in QUERIES:
print " * Running query over dimensions: %s" % ", ".join(query)
query_result = dict(run_query(deduped, query, report_type).collect())
#pprint(query_result)
# Append this RDD to the results for this chunk.
chunk_results.update(query_result)
# Serialize intermediate results to file, so we don't start from scratch if the batch fails.
# We append the week/month at the end of the file name.
serialize_results(chunk_results, report_type, submissions_range[0])
# Append this chunk results to the whole RDD. We assume the keys DO NOT collide.
results.update(chunk_results)
return results
# ### CSV and S3 utility functions.
# Some utility functions to read/write from the S3 store.
# In[7]:
S3_ANALYSIS_BUCKET = "net-mozaws-prod-us-west-2-pipeline-analysis"
S3_ANALYSIS_BASE_PATH = "aplacitelli/"
S3_DASHBOARD_BUCKET = "net-mozaws-prod-metrics-data"
def fetch_previous_state(report_type):
"""
To prevent ACL issues and prevent files from disappearing from the dashboard bucket,
we fetch and stage the canonical state from the analysis bucket, only sending the updated
state to the dashboard bucket.
"""
file_name = get_file_name(report_type)
# Fetch the CSV
client = boto3.client('s3', 'us-west-2')
transfer = boto3.s3.transfer.S3Transfer(client)
key_path = "{}{}".format(S3_ANALYSIS_BASE_PATH, 'fennec-dashboard/' + file_name)
try:
transfer.download_file(S3_ANALYSIS_BUCKET, key_path, file_name)
except botocore.exceptions.ClientError as e:
# If the file wasn't there, that's ok. Otherwise, abort!
if e.response['Error']['Code'] != "404":
raise e
else:
print "Did not find an existing file at '{}'".format(key_path)
def store_new_state(report_type):
"""
To prevent ACL issues and prevent files from disappearing from the dashboard bucket,
we fetch and stage the canonical state from the analysis bucket, only sending the updated
state to the dashboard bucket.
"""
file_name = get_file_name(report_type)
client = boto3.client('s3', 'us-west-2')
transfer = boto3.s3.transfer.S3Transfer(client)
# Update the state in the analysis bucket.
analysis_key_path = "{}{}".format(S3_ANALYSIS_BASE_PATH, 'fennec-dashboard/' + file_name)
transfer.upload_file(file_name, S3_ANALYSIS_BUCKET, analysis_key_path)
# Update the state in the analysis bucket.
transfer.upload_file(file_name, S3_DASHBOARD_BUCKET, 'fennec-dashboard/' + file_name,
extra_args={'ACL': 'bucket-owner-full-control'})
# Utility functions to map our data to CSV and then save it to file.
# In[8]:
def to_csv(r):
# The key itself is a tuple containing the following data:
# (os, geo, channel, date)
data_from_key = r[0]
formatted_date = dt.datetime.strptime(data_from_key[3], "%Y%m%d").strftime("%Y-%m-%d")
return ",".join([
data_from_key[0], # os
data_from_key[1], # geo
data_from_key[2], # channel
formatted_date,
str(r[1]['actives']),
str(r[1]['new_records']),
str(r[1]['d1']),
str(r[1]['d7']),
str(r[1]['d30']),
"0", #str(r[1]['hours']),
"0", #str(r[1]['google']),
"0", #str(r[1]['yahoo']),
"0", #str(r[1]['bing']),
"0", #str(r[1]['other']),
])
def serialize_results(results, report_type, file_suffix=""):
file_name = get_file_name(report_type, file_suffix)
skip_csv_header = False
# If the file is already there, append the new data, but don't print the header again.
if os.path.exists(file_name):
print("Omitting the CSV header")
skip_csv_header = True
csv_lines = map(to_csv, results.iteritems())
print("Writing %i new entries" % len(csv_lines))
with open(file_name, "a") as csv_file:
# The file didn't exist before this call, print the header.
if not skip_csv_header:
header = "os_version,geo,channel,date,actives,new_records,d1,d7,d30,hours,google,yahoo,bing,other"
csv_file.write(header.encode('utf8') + "\n")
# Finally append the data lines.
for r in csv_lines:
#print " ... writing csv line: ", r
csv_file.write(r.encode('utf8') + "\n")
# ### Execute our script
# In[9]:
def get_mode_from_filename():
"""
This is hackery-hacky, until bug 1258685 lands: if there is a file named 'summarize_csv_monthly.ipynb',
we select the monthly operating mode. Otherwise, we stay with weekly.
"""
return "monthly" if os.path.exists('summarize_csv_monthly.ipynb') else "weekly"
#operating_mode = get_mode_from_filename() # either "weekly" or "monthly"
operating_mode = "weekly"
fraction = 1.0
if operating_mode not in ["weekly", "monthly"]:
raise ValueError("Unknown operating mode: %s " % operating_mode)
#start_date = None
#end_date = None
start_date = dt.datetime(2016, 5, 8) # Only use this when backfilling, e.g. dt.datetime(2016,3,6)
end_date = dt.datetime(2016, 5, 26) # Only use this when backfilling, e.g. dt.datetime(2016,3,19)
# Run the query and compute the results.
print "... Running queries"
result = run_queries(operating_mode, start_date, end_date)
# Fetch the previous CSV file from S3.
print "... Fetching previous state"
fetch_previous_state(operating_mode)
# Updates it.
print "... Serializing results"
serialize_results(result, operating_mode)
# Stores the updated one back to S3
print "... Storing new state"
store_new_state(operating_mode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment