-
-
Save saptarshiguha/abd68d6d1c1dccfc22f3a4e7baf06679 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark | |
import py4j | |
from pyspark import SparkContext | |
from pyspark.sql import SQLContext | |
sc = pyspark.SparkContext() | |
sqlContext = SQLContext(sc) | |
print(sqlContext) | |
import readline | |
from rpy2.robjects.packages import importr | |
from rpy2.robjects import pandas2ri | |
import rpy2.robjects as ro | |
import sys | |
import datetime | |
import json | |
import random | |
import subprocess | |
import time | |
import pandas as pd | |
random.seed(10) | |
sampleids = [ random.randint(1,100) for x in range(1)] | |
samplechar = [ "'{}'".format(str(x)) for x in sampleids] | |
ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3", "parquet") | |
ms2 = ms.select("sample_id","client_id","submission_date","search_counts","country","locale","os","os_version","app_name","normalized_channel") | |
ms3 = ms2.filter("app_name='Firefox' and normalized_channel='release' and sample_id in ({})".format( ",".join(samplechar))) | |
sqlContext.registerDataFrameAsTable(ms3, "ms3") | |
ms4 = sqlContext.sql(""" | |
select sample_id as sid, | |
client_id, | |
submission_date as date, | |
search_counts, | |
case when country in ('US','DE','BR','FR','IN','ID','RU','PL','IT','GB','ES','CN','CA','JP','MX') then country | |
else 'others' | |
end as covgeo, | |
case when locale in ('en-US','de','fr','es-ES','pt-BR','ru','pl','it','en-GB','zh-CN','ja','es-MX','nl','es-AR','tr') then locale | |
else 'others' | |
end as covlocale, | |
case when os = 'Linux' then 'Linux' | |
when os = 'Darwin' and substr(os_version,1,2)='10' then 'osx_SnowLeopard' | |
when os = 'Darwin' and substr(os_version,1,2)='11' then 'osx_Lion' | |
when os = 'Darwin' and substr(os_version,1,2)='12' then 'osx_MountainLion' | |
when os = 'Darwin' and substr(os_version,1,2)='13' then 'osx_Mavericks' | |
when os = 'Darwin' and substr(os_version,1,2)='14' then 'osx_Yosemite' | |
when os = 'Darwin' and substr(os_version,1,2)='15' then 'osx_ElCapitan' | |
when os = 'Darwin' then 'osx_Others' | |
when os = 'Windows_NT' and instr(os_version,'6.1')=1 then 'win_7' | |
when os = 'Windows_NT' and instr(os_version,'6.0')=1 then 'win_Vista' | |
when os = 'Windows_NT' and instr(os_version,'5.1')=1 then 'win_XP' | |
when os = 'Windows_NT' and instr(os_version,'5.2')=1 then 'win_XP' | |
when os = 'Windows_NT' and instr(os_version,'6.2')=1 then 'win_8' | |
when os = 'Windows_NT' and instr(os_version,'6.3')=1 then 'win_8' | |
when os = 'Windows_NT' and instr(os_version,'10.0')=1 then 'win_10' | |
when os = 'Windows_NT' then 'win_Others' | |
when os in ('Windows_95', 'Windows_98') then 'win_Others' | |
else 'others' | |
end as covosname | |
from ms3 | |
""") | |
sqlContext.registerDataFrameAsTable(ms4,"ms4") | |
try: | |
subprocess.call(["aws","s3","cp","s3://mozilla-metrics/sguha/fxlove.json","/tmp/"]) | |
with open('/tmp/fxlove.json') as json_dat: | |
fxlove = json.load(json_dat) | |
print("JSON log file found") | |
except: | |
print("JSON log file not found") | |
fxlove = {} | |
if fxlove.get("fromDate",None) is None: | |
fromDate = datetime.datetime.strptime("2016-07-01","%Y-%m-%d").date() | |
else: | |
fromDate = datetime.datetime.strptime(fxlove.get("fromDate"),"%Y-%m-%d").date() | |
currentDate = datetime.datetime.now().date() | |
################################################################################ | |
## Get DAU for ALL And Search Counts | |
################################################################################ | |
daudate = sqlContext.sql(""" | |
SELECT | |
covgeo, | |
covosname, | |
date, | |
Count(DISTINCT( client_id )) as n | |
FROM ms4 | |
WHERE date > '{}' and date <= '{}' | |
GROUP BY date,covgeo,covosname | |
ORDER BY date,covgeo,covosname | |
""".format(datetime.datetime.strftime(fromDate,"%Y%m%d"),datetime.datetime.strftime(currentDate,"%Y%m%d") )) | |
daudate = daudate.toPandas() | |
import sys | |
if len(daudate)==0: | |
sys.exit(0) | |
################################################################################ | |
## Get Search Information | |
################################################################################ | |
side1 = sqlContext.sql("select covgeo, covosname, date, client_id, explode(search_counts.count) as s from ms4") | |
sqlContext.registerDataFrameAsTable(side1,"side1") | |
srcdate = sqlContext.sql(""" | |
SELECT | |
covgeo, | |
covosname, | |
date, | |
Count(DISTINCT( client_id )) as ns, | |
sum(s) as searches | |
FROM side1 | |
WHERE date > '{}' and date <= '{}' | |
GROUP BY date,covgeo,covosname | |
ORDER BY date,covgeo,covosname | |
""".format(datetime.datetime.strftime(fromDate,"%Y%m%d"),datetime.datetime.strftime(currentDate,"%Y%m%d") )) | |
srcdate = srcdate.toPandas() | |
daudate = daudate.merge(srcdate, on=['date','covgeo','covosname']) | |
if len(daudate)==0: | |
sys.exit(0) | |
################################################################################ | |
## Now Get WAU Information (Just Like Above) | |
################################################################################ | |
days = [fromDate + datetime.timedelta(days = x+1) for x in range( (currentDate-fromDate).days )] | |
days = filter(lambda x: datetime.datetime.strftime(x,"%A") in ("Monday","Wednesday","Friday"), days) | |
collector = [] | |
for i,d in enumerate(days): | |
print("{} ({}/{})".format( d, i, len(days))) | |
x = sqlContext.sql(""" | |
SELECT | |
covgeo, | |
covosname, | |
'{}' as date, | |
Count(DISTINCT( client_id )) as n7 | |
FROM ms4 | |
WHERE date > '{}' and date <= '{}' | |
GROUP BY covgeo,covosname | |
ORDER BY covgeo,covosname | |
""".format(datetime.datetime.strftime(d,"%Y%m%d") | |
, datetime.datetime.strftime(d-datetime.timedelta(days = 7),"%Y%m%d") | |
, datetime.datetime.strftime(d,"%Y%m%d"))) | |
x = x.toPandas() | |
y = sqlContext.sql(""" | |
SELECT | |
covgeo, | |
covosname, | |
'{}' as date, | |
Count(DISTINCT( client_id )) as n28 | |
FROM ms4 | |
WHERE date > '{}' and date <= '{}' | |
GROUP BY covgeo,covosname | |
ORDER BY covgeo,covosname | |
""".format(datetime.datetime.strftime(d,"%Y%m%d") | |
, datetime.datetime.strftime(d-datetime.timedelta(days = 28),"%Y%m%d") | |
, datetime.datetime.strftime(d,"%Y%m%d"))) | |
y = y.toPandas() | |
sec1 = datetime.datetime.strftime(d-datetime.timedelta(days = 27),"%Y%m%d") | |
sec2 = datetime.datetime.strftime(d-datetime.timedelta(days = 7),"%Y%m%d") | |
sec3 = datetime.datetime.strftime(d-datetime.timedelta(days = 6),"%Y%m%d") | |
sec4 = datetime.datetime.strftime(d,"%Y%m%d") | |
cardenom = sqlContext.sql( | |
""" | |
SELECT | |
client_id, | |
covgeo, | |
covosname, | |
'{}' as date | |
FROM ms4 | |
WHERE date >= '{}' and date <= '{}' | |
GROUP BY client_id,covgeo,covosname | |
ORDER BY client_id,covgeo,covosname | |
""".format(sec4,sec1,sec2)) | |
sqlContext.registerDataFrameAsTable( cardenom,"cardenom") | |
carnump1 = sqlContext.sql( """ | |
SELECT | |
client_id, | |
covgeo, | |
covosname, | |
'{}' as date | |
FROM ms4 | |
WHERE date >= '{}' and date <= '{}' | |
GROUP BY client_id,covgeo,covosname | |
ORDER BY client_id,covgeo,covosname | |
""".format(sec4,sec3,sec4)) | |
sqlContext.registerDataFrameAsTable(carnump1,"carnump1") | |
sqlContext.sql(""" | |
select carnump1.client_id, carnump1.covgeo,carnump1.covosname, carnump1.date | |
from carnump1 join cardenom | |
on carnump1.client_id = cardenom.client_id | |
and carnump1.covgeo = cardenom.covgeo | |
and carnump1.covosname = cardenom.covosname | |
""").registerTempTable("t1") | |
sqlContext.sql(""" | |
select covgeo, covosname, date,count(distinct(client_id)) as cartop | |
from t1 group by covgeo, covosname, date | |
""").registerTempTable("t11") | |
sqlContext.sql(""" | |
select covgeo, covosname, date,count(distinct(client_id)) as carbot | |
from cardenom group by covgeo, covosname, date | |
""").registerTempTable("t22") | |
z = sqlContext.sql(""" | |
select t11.covgeo, t11.covosname, t11.date,cartop, carbot | |
from t11 join t22 | |
on t11.covgeo = t22.covgeo | |
and t11.covosname = t22.covosname | |
and t11.date=t22.date | |
""").toPandas() | |
collector.append( (x,y,z) ) | |
collector2 = [] | |
for s in collector: | |
k = s[1].merge(s[0], on=['covgeo','covosname','date'],how='left') | |
k2 = k.merge(s[2], on=['covgeo','covosname','date'],how='left') | |
collector2.append(k2) | |
waumau = pd.concat(collector2) | |
# import cPickle as pickle | |
# pickle.dump( [ daudate, collector] , open( "/tmp/save.pb", "wb" ) ) | |
# subprocess.call(["aws","s3","cp","/tmp/save.pb","s3://mozilla-metrics/sguha/"]) | |
# subprocess.call(["aws","s3","cp","s3://mozilla-metrics/sguha/save.pb","/tmp/"]) | |
# pickle.load( open( "/tmp/save.pb", "rb" ) ) | |
################################################################################ | |
## We now import some R packages and send daudate to R | |
## You need to run spark.rhipe(cl) for the data table package | |
################################################################################ | |
importr("base") | |
importr("utils") | |
importr("data.table") | |
pandas2ri.activate() | |
ro.r("options(stringsAsFactors=FALSE)") | |
ro.globalenv['daudate'] = pandas2ri.py2ri(daudate) # send the daudate to R | |
ro.r(""" | |
Sum <- function(s) sum(s, na.rm=TRUE) | |
daudate <- data.table(daudate) | |
daudate[,":="(n=as.numeric(n),ns=as.numeric(ns),searches=as.numeric(searches))] | |
daudate <- rbind(daudate, | |
daudate[,list(covgeo='all',covosname='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=date], | |
daudate[,list(covosname='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=list(date,covgeo)], | |
daudate[,list(covgeo='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=list(date,covosname)]) | |
daudate[, averageDAU:=as.numeric(filter(n,rep(1,min(7,.N))/min(7,.N), sides=1)),by=list(covgeo,covosname)] | |
""") | |
ro.globalenv['waumau'] = pandas2ri.py2ri(waumau) # send the daudate to R | |
ro.r(""" | |
waumau <- data.table(waumau) | |
waumau <- waumau[, ":="(n7=as.numeric(n7),n28=as.numeric(n28) | |
,cartop=as.numeric(cartop),carbot=as.numeric(carbot))] | |
waumau <- rbind(waumau, | |
waumau[,list(covgeo='all',covosname='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot)) | |
,by=date], | |
waumau[,list(covosname='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot)) | |
,by=list(date,covgeo)], | |
waumau[,list(covgeo='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot)) | |
,by=list(date,covosname)]) | |
waumau2 <- merge(waumau,daudate[,list(covgeo,covosname,date,averageDAU)], by=c("covosname","covgeo","date")) | |
waumau2[, ":="(er=averageDAU/n28,car = cartop/carbot)] | |
waumau2[, ":="(carbot = NULL, cartop = NULL)] | |
""") | |
ro.r(""" | |
p = new.env() | |
tryCatch({ | |
system("aws s3 cp s3://mozilla-metrics/sguha/fxlove.Rdata /tmp/") | |
load("/tmp/fxlove.Rdata",env=p) | |
},error= function(e) NULL) | |
""") | |
ro.r(""" | |
library(data.table) | |
if(!is.null(p$daudate)){ | |
daudate <- rbind(p$daudate, daudate) | |
setkey(daudate,"date","covosname","covgeo") | |
daudate <- unique(daudate) | |
} | |
""") | |
ro.r(""" | |
if(!is.null(p$waumau2)){ | |
waumau2 <- rbind(p$waumau2, waumau2) | |
setkey(waumau2,"date","covosname","covgeo") | |
waumau2 <- unique(waumau2) | |
} | |
""") | |
ro.r(""" | |
save(waumau2, daudate, file="/tmp/fxlove.Rdata") | |
write.csv(waumau2,file="/tmp/fxlovewaumau2.csv",row.names=FALSE) | |
write.csv(daudate,file="/tmp/fxlovedaudate.csv",row.names=FALSE) | |
system("aws s3 cp /tmp/fxlove.Rdata s3://mozilla-metrics/sguha/") | |
system("aws s3 cp /tmp/fxlovewaumau2.csv s3://mozilla-metrics/sguha/") | |
system("aws s3 cp /tmp/fxlovedaudate.csv s3://mozilla-metrics/sguha/") | |
""") | |
fxlove = { 'fromDate' : datetime.datetime.strftime(currentDate, "%Y-%m-%d") } | |
with open('/tmp/fxlove.json', 'w') as outfile: | |
json.dump(fxlove, outfile) | |
subprocess.call(["aws","s3","cp","/tmp/fxlove.json","s3://mozilla-metrics/sguha/"]) | |
sys.exit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment