Created
October 30, 2017 22:07
-
-
Save saptarshiguha/f3faea80ec897d139f794d8beda1e2cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Data Extraction | |
#### Python Extraction Code | |
To keep it analysis for existing users comparable to the new profile analysis, | |
if a profile | |
- updated to Firefox Beta 56 between August 8th, 2017 and 14 days thereafter, | |
- they were on some version before 56 | |
then we have an existing profile. We take the three weeks before they | |
updated and the three weeks after they update. | |
```{python eval=FALSE} | |
import sys | |
import datetime | |
import random | |
import subprocess | |
import mozillametricstools.common.functions as mozfun | |
from pyspark.sql import SparkSession | |
spark = SparkSession.builder.getOrCreate() | |
cd = spark.read.option("mergeSchema", "true").\ | |
parquet("s3://telemetry-parquet/clients_daily/v5") | |
cd.createOrReplaceTempView('cd') | |
``` | |
The base extraction function | |
```{python eval=FALSE} | |
def base(minSubmission, maxSubmission,sampleid): | |
a= spark.sql(""" | |
select | |
client_id as cid, | |
activity_date_s3 as date, | |
substr(profile_creation_date,1,10) as pcd, | |
profile_age_in_days as age, | |
app_version as version, | |
sessions_started_on_this_day as ns, | |
subsession_hours_sum as th, | |
active_hours_sum as ah, | |
search_count_all_sum as srch, | |
scalar_parent_browser_engagement_unique_domains_count_max as meandom, | |
scalar_parent_browser_engagement_total_uri_count_sum as turi, | |
scalar_parent_browser_engagement_tab_open_event_count_sum as x11, | |
scalar_parent_browser_engagement_window_open_event_count_sum x12, | |
crash_submit_success_main_sum as cm, | |
crashes_detected_content_sum as cc1, | |
shutdown_kill_sum as cc2, | |
crashes_detected_plugin_sum as cp1, | |
crashes_detected_gmplugin_sum as cp2 | |
from cd | |
where | |
activity_date_s3 >= '{minSubmission}' | |
AND activity_date_s3 <= '{maxSubmission}' | |
AND normalized_channel = 'release' | |
AND app_name = 'Firefox' | |
AND sample_id in ('{sampleid}') | |
""".format( minSubmission=minSubmission, maxSubmission=maxSubmission,sampleid=sampleid)) | |
a = a.na.fill({ | |
'meandom':0, 'turi':0, 'x11':0,'x12':0,'cc1':0,'cc2':0,'cp1':0,'cp2':0,'cm':0 | |
}) | |
a.createOrReplaceTempView("base0") | |
b= spark.sql(""" | |
select | |
cid, | |
date, | |
pcd, | |
age, | |
version, | |
ns, | |
th, | |
ah, | |
srch, | |
meandom, | |
turi, | |
x11 + x12 as ttabwin, | |
cm, | |
cc1- cc2 as cc, | |
cp1-cp2 as cp | |
from base0 | |
""") | |
b.createOrReplaceTempView("base") | |
return(b) | |
``` | |
**Filter 1**, we now want profiles from `base` who *updated* from a version | |
before `V` to `V`. See the 'having' clause. | |
```{python eval=FALSE} | |
def filter1(toVersion): | |
ex1=spark.sql(""" | |
select | |
cid, | |
min(case when substring(version,1,2)='{toVersion}' then date else '2070-01-01' end) as dateUpdated, | |
(case when max(substring(version,1,2))>='{toVersion}' then 1 else 0 end) as didUpdateToVersionGE_V, | |
(case when min(substring(version,1,2))<'{toVersion}' then 1 else 0 end) as wasonVersionLT_V | |
from base | |
group by cid | |
having didUpdateToVersionGE_V=1 and wasonVersionLT_V=1 | |
""".format( toVersion=toVersion)) | |
ex1.createOrReplaceTempView("ex1") | |
base2 = spark.sql(""" select base.*, ex1.dateUpdated from ex1 left join base where ex1.cid=base.cid""" ) | |
base2.createOrReplaceTempView("base2") | |
return base2 | |
``` | |
**Filter 2**, we want profiles that updated to this version `V` during the time | |
period `[D1,D1+14]`. | |
```{python eval=FALSE} | |
def filter2(toVersionLabel,d1,d2,dbefore, dafter): | |
base3=spark.sql(""" | |
select | |
'exist' as persona, | |
'{toVersionLabel}' as cohort, | |
*, | |
datediff( | |
from_unixtime(unix_timestamp(date, 'yyyy-MM-dd')), | |
from_unixtime(unix_timestamp(dateUpdated, 'yyyy-MM-dd')) | |
) AS daysSinceUpdate | |
from base2 | |
where dateUpdated >= '{d1}' and dateUpdated<= '{d2}' | |
having daysSinceUpdate >= -{dbefore} and daysSinceUpdate<={dafter} | |
""".format(toVersionLabel=toVersionLabel, d1 = d1, d2=d2,dbefore=dbefore, dafter=dafter)) | |
base3.createOrReplaceTempView("base3") | |
return base3 | |
``` | |
Tie it all together | |
```{python eval=FALSE} | |
def prePost(sampleid,toVersion, toVersionLabel, d1,d2,minSubmission,maxSubmission,dbefore, dafter): | |
base(minSubmission,maxSubmission,sampleid) | |
filter1(toVersion) | |
filter2(toVersionLabel,d1,d2,dbefore, dafter) | |
base4 = spark.sql(""" select *, | |
case when cm+cc+cp > 0 then 1 else 0 end as crq, | |
case when th>0 and turi>0 then 1 else 0 end as active, | |
case when date>=dateUpdated then 'postUpdate' else 'preUpdate' end as period | |
from base3 where th>=0""") | |
base4.createOrReplaceTempView("base4") | |
return base4 | |
``` | |
For this report, | |
```{python eval=FALSE} | |
releases = { | |
'54': datetime.datetime.strptime("2017-06-13","%Y-%m-%d").date(), | |
'55': datetime.datetime.strptime("2017-08-08","%Y-%m-%d").date(), | |
'56': datetime.datetime.strptime("2017-09-25","%Y-%m-%d").date(), | |
'origin': datetime.datetime.strptime("1970-01-01","%Y-%m-%d").date() | |
} | |
v54 = prePost('42',toVersion="54",toVersionLabel="54", | |
d1 = releases['54'].strftime("%Y-%m-%d"), | |
d2 = (releases['54']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"), | |
minSubmission = (releases['54']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"), | |
maxSubmission = (releases['54']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"), | |
dbefore=21,dafter=21) | |
v55 = prePost('42',toVersion="55",toVersionLabel="55", | |
d1 = releases['55'].strftime("%Y-%m-%d"), | |
d2 = (releases['55']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"), | |
minSubmission = (releases['55']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"), | |
maxSubmission = (releases['55']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"), | |
dbefore=21,dafter=21) | |
v56 = prePost('43',toVersion="56",toVersionLabel="56", | |
d1 = releases['56'].strftime("%Y-%m-%d"), | |
d2 = (releases['56']+ datetime.timedelta(days=14)).strftime("%Y-%m-%d"), | |
minSubmission = (releases['56']+ datetime.timedelta(days=-21)).strftime("%Y-%m-%d"), | |
maxSubmission = (releases['56']+ datetime.timedelta(days=21+14)).strftime("%Y-%m-%d"), | |
dbefore=21,dafter=21) | |
cohorts = v56.union(v55) | |
cohorts = cohorts.cache() | |
cohorts.createOrReplaceTempView("cohorts") | |
## Saving the data | |
import subprocess | |
O = "s3://mozilla-metrics/user/sguha/tmp/betaexist" | |
subprocess.call(["aws", "s3", "rm", "--recursive", O]) | |
write = pyspark.sql.DataFrameWriter(cohorts) | |
write.parquet(path=O ,mode='overwrite') | |
## Reading the data | |
cohorts = spark.read.option("mergeSchema", "true").parquet(O) | |
cohorts.createOrReplaceTempView("cohorts") | |
cohorts = cohorts.cache() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment