Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created October 30, 2017 22:07
Show Gist options
  • Save saptarshiguha/f3faea80ec897d139f794d8beda1e2cd to your computer and use it in GitHub Desktop.
Save saptarshiguha/f3faea80ec897d139f794d8beda1e2cd to your computer and use it in GitHub Desktop.
### Data Extraction
#### Python Extraction Code
To keep it analysis for existing users comparable to the new profile analysis,
if a profile
- updated to Firefox Beta 56 between August 8th, 2017 and 14 days thereafter,
- they were on some version before 56
then we have an existing profile. We take the three weeks before they
updated and the three weeks after they update.
```{python eval=FALSE}
import sys
import datetime
import random
import subprocess
import mozillametricstools.common.functions as mozfun
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
cd = spark.read.option("mergeSchema", "true").\
parquet("s3://telemetry-parquet/clients_daily/v5")
cd.createOrReplaceTempView('cd')
```
The base extraction function
```{python eval=FALSE}
def base(minSubmission, maxSubmission,sampleid):
a= spark.sql("""
select
client_id as cid,
activity_date_s3 as date,
substr(profile_creation_date,1,10) as pcd,
profile_age_in_days as age,
app_version as version,
sessions_started_on_this_day as ns,
subsession_hours_sum as th,
active_hours_sum as ah,
search_count_all_sum as srch,
scalar_parent_browser_engagement_unique_domains_count_max as meandom,
scalar_parent_browser_engagement_total_uri_count_sum as turi,
scalar_parent_browser_engagement_tab_open_event_count_sum as x11,
scalar_parent_browser_engagement_window_open_event_count_sum x12,
crash_submit_success_main_sum as cm,
crashes_detected_content_sum as cc1,
shutdown_kill_sum as cc2,
crashes_detected_plugin_sum as cp1,
crashes_detected_gmplugin_sum as cp2
from cd
where
activity_date_s3 >= '{minSubmission}'
AND activity_date_s3 <= '{maxSubmission}'
AND normalized_channel = 'release'
AND app_name = 'Firefox'
AND sample_id in ('{sampleid}')
""".format( minSubmission=minSubmission, maxSubmission=maxSubmission,sampleid=sampleid))
a = a.na.fill({
'meandom':0, 'turi':0, 'x11':0,'x12':0,'cc1':0,'cc2':0,'cp1':0,'cp2':0,'cm':0
})
a.createOrReplaceTempView("base0")
b= spark.sql("""
select
cid,
date,
pcd,
age,
version,
ns,
th,
ah,
srch,
meandom,
turi,
x11 + x12 as ttabwin,
cm,
cc1- cc2 as cc,
cp1-cp2 as cp
from base0
""")
b.createOrReplaceTempView("base")
return(b)
```
**Filter 1**, we now want profiles from `base` who *updated* from a version
before `V` to `V`. See the 'having' clause.
```{python eval=FALSE}
def filter1(toVersion):
ex1=spark.sql("""
select
cid,
min(case when substring(version,1,2)='{toVersion}' then date else '2070-01-01' end) as dateUpdated,
(case when max(substring(version,1,2))>='{toVersion}' then 1 else 0 end) as didUpdateToVersionGE_V,
(case when min(substring(version,1,2))<'{toVersion}' then 1 else 0 end) as wasonVersionLT_V
from base
group by cid
having didUpdateToVersionGE_V=1 and wasonVersionLT_V=1
""".format( toVersion=toVersion))
ex1.createOrReplaceTempView("ex1")
base2 = spark.sql(""" select base.*, ex1.dateUpdated from ex1 left join base where ex1.cid=base.cid""" )
base2.createOrReplaceTempView("base2")
return base2
```
**Filter 2**, we want profiles that updated to this version `V` during the time
period `[D1,D1+14]`.
```{python eval=FALSE}
def filter2(toVersionLabel,d1,d2,dbefore, dafter):
base3=spark.sql("""
select
'exist' as persona,
'{toVersionLabel}' as cohort,
*,
datediff(
from_unixtime(unix_timestamp(date, 'yyyy-MM-dd')),
from_unixtime(unix_timestamp(dateUpdated, 'yyyy-MM-dd'))
) AS daysSinceUpdate
from base2
where dateUpdated >= '{d1}' and dateUpdated<= '{d2}'
having daysSinceUpdate >= -{dbefore} and daysSinceUpdate<={dafter}
""".format(toVersionLabel=toVersionLabel, d1 = d1, d2=d2,dbefore=dbefore, dafter=dafter))
base3.createOrReplaceTempView("base3")
return base3
```
Tie it all together
```{python eval=FALSE}
def prePost(sampleid,toVersion, toVersionLabel, d1,d2,minSubmission,maxSubmission,dbefore, dafter):
base(minSubmission,maxSubmission,sampleid)
filter1(toVersion)
filter2(toVersionLabel,d1,d2,dbefore, dafter)
base4 = spark.sql(""" select *,
case when cm+cc+cp > 0 then 1 else 0 end as crq,
case when th>0 and turi>0 then 1 else 0 end as active,
case when date>=dateUpdated then 'postUpdate' else 'preUpdate' end as period
from base3 where th>=0""")
base4.createOrReplaceTempView("base4")
return base4
```
For this report,
```{python eval=FALSE}
releases = {
'54': datetime.datetime.strptime("2017-06-13","%Y-%m-%d").date(),
'55': datetime.datetime.strptime("2017-08-08","%Y-%m-%d").date(),
'56': datetime.datetime.strptime("2017-09-25","%Y-%m-%d").date(),
'origin': datetime.datetime.strptime("1970-01-01","%Y-%m-%d").date()
}
v54 = prePost('42',toVersion="54",toVersionLabel="54",
d1 = releases['54'].strftime("%Y-%m-%d"),
d2 = (releases['54']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"),
minSubmission = (releases['54']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"),
maxSubmission = (releases['54']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"),
dbefore=21,dafter=21)
v55 = prePost('42',toVersion="55",toVersionLabel="55",
d1 = releases['55'].strftime("%Y-%m-%d"),
d2 = (releases['55']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"),
minSubmission = (releases['55']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"),
maxSubmission = (releases['55']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"),
dbefore=21,dafter=21)
v56 = prePost('43',toVersion="56",toVersionLabel="56",
d1 = releases['56'].strftime("%Y-%m-%d"),
d2 = (releases['56']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"),
minSubmission = (releases['56']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"),
maxSubmission = (releases['56']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"),
dbefore=21,dafter=21)
cohorts = v56.union(v55)
cohorts = cohorts.cache()
cohorts.createOrReplaceTempView("cohorts")
## Saving the data
import subprocess
O = "s3://mozilla-metrics/user/sguha/tmp/betaexist"
subprocess.call(["aws", "s3", "rm", "--recursive", O])
write = pyspark.sql.DataFrameWriter(cohorts)
write.parquet(path=O ,mode='overwrite')
## Reading the data
cohorts = spark.read.option("mergeSchema", "true").parquet(O)
cohorts.createOrReplaceTempView("cohorts")
cohorts = cohorts.cache()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment