Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created January 9, 2017 18:41
Show Gist options
  • Save saptarshiguha/abd68d6d1c1dccfc22f3a4e7baf06679 to your computer and use it in GitHub Desktop.
Save saptarshiguha/abd68d6d1c1dccfc22f3a4e7baf06679 to your computer and use it in GitHub Desktop.
import pyspark
import py4j
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
print(sqlContext)
import readline
from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
import rpy2.robjects as ro
import sys
import datetime
import json
import random
import subprocess
import time
import pandas as pd
random.seed(10)
sampleids = [ random.randint(1,100) for x in range(1)]
samplechar = [ "'{}'".format(str(x)) for x in sampleids]
ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3", "parquet")
ms2 = ms.select("sample_id","client_id","submission_date","search_counts","country","locale","os","os_version","app_name","normalized_channel")
ms3 = ms2.filter("app_name='Firefox' and normalized_channel='release' and sample_id in ({})".format( ",".join(samplechar)))
sqlContext.registerDataFrameAsTable(ms3, "ms3")
ms4 = sqlContext.sql("""
select sample_id as sid,
client_id,
submission_date as date,
search_counts,
case when country in ('US','DE','BR','FR','IN','ID','RU','PL','IT','GB','ES','CN','CA','JP','MX') then country
else 'others'
end as covgeo,
case when locale in ('en-US','de','fr','es-ES','pt-BR','ru','pl','it','en-GB','zh-CN','ja','es-MX','nl','es-AR','tr') then locale
else 'others'
end as covlocale,
case when os = 'Linux' then 'Linux'
when os = 'Darwin' and substr(os_version,1,2)='10' then 'osx_SnowLeopard'
when os = 'Darwin' and substr(os_version,1,2)='11' then 'osx_Lion'
when os = 'Darwin' and substr(os_version,1,2)='12' then 'osx_MountainLion'
when os = 'Darwin' and substr(os_version,1,2)='13' then 'osx_Mavericks'
when os = 'Darwin' and substr(os_version,1,2)='14' then 'osx_Yosemite'
when os = 'Darwin' and substr(os_version,1,2)='15' then 'osx_ElCapitan'
when os = 'Darwin' then 'osx_Others'
when os = 'Windows_NT' and instr(os_version,'6.1')=1 then 'win_7'
when os = 'Windows_NT' and instr(os_version,'6.0')=1 then 'win_Vista'
when os = 'Windows_NT' and instr(os_version,'5.1')=1 then 'win_XP'
when os = 'Windows_NT' and instr(os_version,'5.2')=1 then 'win_XP'
when os = 'Windows_NT' and instr(os_version,'6.2')=1 then 'win_8'
when os = 'Windows_NT' and instr(os_version,'6.3')=1 then 'win_8'
when os = 'Windows_NT' and instr(os_version,'10.0')=1 then 'win_10'
when os = 'Windows_NT' then 'win_Others'
when os in ('Windows_95', 'Windows_98') then 'win_Others'
else 'others'
end as covosname
from ms3
""")
sqlContext.registerDataFrameAsTable(ms4,"ms4")
try:
subprocess.call(["aws","s3","cp","s3://mozilla-metrics/sguha/fxlove.json","/tmp/"])
with open('/tmp/fxlove.json') as json_dat:
fxlove = json.load(json_dat)
print("JSON log file found")
except:
print("JSON log file not found")
fxlove = {}
if fxlove.get("fromDate",None) is None:
fromDate = datetime.datetime.strptime("2016-07-01","%Y-%m-%d").date()
else:
fromDate = datetime.datetime.strptime(fxlove.get("fromDate"),"%Y-%m-%d").date()
currentDate = datetime.datetime.now().date()
################################################################################
## Get DAU for ALL And Search Counts
################################################################################
daudate = sqlContext.sql("""
SELECT
covgeo,
covosname,
date,
Count(DISTINCT( client_id )) as n
FROM ms4
WHERE date > '{}' and date <= '{}'
GROUP BY date,covgeo,covosname
ORDER BY date,covgeo,covosname
""".format(datetime.datetime.strftime(fromDate,"%Y%m%d"),datetime.datetime.strftime(currentDate,"%Y%m%d") ))
daudate = daudate.toPandas()
import sys
if len(daudate)==0:
sys.exit(0)
################################################################################
## Get Search Information
################################################################################
side1 = sqlContext.sql("select covgeo, covosname, date, client_id, explode(search_counts.count) as s from ms4")
sqlContext.registerDataFrameAsTable(side1,"side1")
srcdate = sqlContext.sql("""
SELECT
covgeo,
covosname,
date,
Count(DISTINCT( client_id )) as ns,
sum(s) as searches
FROM side1
WHERE date > '{}' and date <= '{}'
GROUP BY date,covgeo,covosname
ORDER BY date,covgeo,covosname
""".format(datetime.datetime.strftime(fromDate,"%Y%m%d"),datetime.datetime.strftime(currentDate,"%Y%m%d") ))
srcdate = srcdate.toPandas()
daudate = daudate.merge(srcdate, on=['date','covgeo','covosname'])
if len(daudate)==0:
sys.exit(0)
################################################################################
## Now Get WAU Information (Just Like Above)
################################################################################
days = [fromDate + datetime.timedelta(days = x+1) for x in range( (currentDate-fromDate).days )]
days = filter(lambda x: datetime.datetime.strftime(x,"%A") in ("Monday","Wednesday","Friday"), days)
collector = []
for i,d in enumerate(days):
print("{} ({}/{})".format( d, i, len(days)))
x = sqlContext.sql("""
SELECT
covgeo,
covosname,
'{}' as date,
Count(DISTINCT( client_id )) as n7
FROM ms4
WHERE date > '{}' and date <= '{}'
GROUP BY covgeo,covosname
ORDER BY covgeo,covosname
""".format(datetime.datetime.strftime(d,"%Y%m%d")
, datetime.datetime.strftime(d-datetime.timedelta(days = 7),"%Y%m%d")
, datetime.datetime.strftime(d,"%Y%m%d")))
x = x.toPandas()
y = sqlContext.sql("""
SELECT
covgeo,
covosname,
'{}' as date,
Count(DISTINCT( client_id )) as n28
FROM ms4
WHERE date > '{}' and date <= '{}'
GROUP BY covgeo,covosname
ORDER BY covgeo,covosname
""".format(datetime.datetime.strftime(d,"%Y%m%d")
, datetime.datetime.strftime(d-datetime.timedelta(days = 28),"%Y%m%d")
, datetime.datetime.strftime(d,"%Y%m%d")))
y = y.toPandas()
sec1 = datetime.datetime.strftime(d-datetime.timedelta(days = 27),"%Y%m%d")
sec2 = datetime.datetime.strftime(d-datetime.timedelta(days = 7),"%Y%m%d")
sec3 = datetime.datetime.strftime(d-datetime.timedelta(days = 6),"%Y%m%d")
sec4 = datetime.datetime.strftime(d,"%Y%m%d")
cardenom = sqlContext.sql(
"""
SELECT
client_id,
covgeo,
covosname,
'{}' as date
FROM ms4
WHERE date >= '{}' and date <= '{}'
GROUP BY client_id,covgeo,covosname
ORDER BY client_id,covgeo,covosname
""".format(sec4,sec1,sec2))
sqlContext.registerDataFrameAsTable( cardenom,"cardenom")
carnump1 = sqlContext.sql( """
SELECT
client_id,
covgeo,
covosname,
'{}' as date
FROM ms4
WHERE date >= '{}' and date <= '{}'
GROUP BY client_id,covgeo,covosname
ORDER BY client_id,covgeo,covosname
""".format(sec4,sec3,sec4))
sqlContext.registerDataFrameAsTable(carnump1,"carnump1")
sqlContext.sql("""
select carnump1.client_id, carnump1.covgeo,carnump1.covosname, carnump1.date
from carnump1 join cardenom
on carnump1.client_id = cardenom.client_id
and carnump1.covgeo = cardenom.covgeo
and carnump1.covosname = cardenom.covosname
""").registerTempTable("t1")
sqlContext.sql("""
select covgeo, covosname, date,count(distinct(client_id)) as cartop
from t1 group by covgeo, covosname, date
""").registerTempTable("t11")
sqlContext.sql("""
select covgeo, covosname, date,count(distinct(client_id)) as carbot
from cardenom group by covgeo, covosname, date
""").registerTempTable("t22")
z = sqlContext.sql("""
select t11.covgeo, t11.covosname, t11.date,cartop, carbot
from t11 join t22
on t11.covgeo = t22.covgeo
and t11.covosname = t22.covosname
and t11.date=t22.date
""").toPandas()
collector.append( (x,y,z) )
collector2 = []
for s in collector:
k = s[1].merge(s[0], on=['covgeo','covosname','date'],how='left')
k2 = k.merge(s[2], on=['covgeo','covosname','date'],how='left')
collector2.append(k2)
waumau = pd.concat(collector2)
# import cPickle as pickle
# pickle.dump( [ daudate, collector] , open( "/tmp/save.pb", "wb" ) )
# subprocess.call(["aws","s3","cp","/tmp/save.pb","s3://mozilla-metrics/sguha/"])
# subprocess.call(["aws","s3","cp","s3://mozilla-metrics/sguha/save.pb","/tmp/"])
# pickle.load( open( "/tmp/save.pb", "rb" ) )
################################################################################
## We now import some R packages and send daudate to R
## You need to run spark.rhipe(cl) for the data table package
################################################################################
importr("base")
importr("utils")
importr("data.table")
pandas2ri.activate()
ro.r("options(stringsAsFactors=FALSE)")
ro.globalenv['daudate'] = pandas2ri.py2ri(daudate) # send the daudate to R
ro.r("""
Sum <- function(s) sum(s, na.rm=TRUE)
daudate <- data.table(daudate)
daudate[,":="(n=as.numeric(n),ns=as.numeric(ns),searches=as.numeric(searches))]
daudate <- rbind(daudate,
daudate[,list(covgeo='all',covosname='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=date],
daudate[,list(covosname='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=list(date,covgeo)],
daudate[,list(covgeo='all', n=Sum(n),ns=Sum(ns),searches=Sum(searches)),by=list(date,covosname)])
daudate[, averageDAU:=as.numeric(filter(n,rep(1,min(7,.N))/min(7,.N), sides=1)),by=list(covgeo,covosname)]
""")
ro.globalenv['waumau'] = pandas2ri.py2ri(waumau) # send the daudate to R
ro.r("""
waumau <- data.table(waumau)
waumau <- waumau[, ":="(n7=as.numeric(n7),n28=as.numeric(n28)
,cartop=as.numeric(cartop),carbot=as.numeric(carbot))]
waumau <- rbind(waumau,
waumau[,list(covgeo='all',covosname='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot))
,by=date],
waumau[,list(covosname='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot))
,by=list(date,covgeo)],
waumau[,list(covgeo='all', n7=Sum(n7),n28=Sum(n28),cartop=Sum(cartop),carbot=Sum(carbot))
,by=list(date,covosname)])
waumau2 <- merge(waumau,daudate[,list(covgeo,covosname,date,averageDAU)], by=c("covosname","covgeo","date"))
waumau2[, ":="(er=averageDAU/n28,car = cartop/carbot)]
waumau2[, ":="(carbot = NULL, cartop = NULL)]
""")
ro.r("""
p = new.env()
tryCatch({
system("aws s3 cp s3://mozilla-metrics/sguha/fxlove.Rdata /tmp/")
load("/tmp/fxlove.Rdata",env=p)
},error= function(e) NULL)
""")
ro.r("""
library(data.table)
if(!is.null(p$daudate)){
daudate <- rbind(p$daudate, daudate)
setkey(daudate,"date","covosname","covgeo")
daudate <- unique(daudate)
}
""")
ro.r("""
if(!is.null(p$waumau2)){
waumau2 <- rbind(p$waumau2, waumau2)
setkey(waumau2,"date","covosname","covgeo")
waumau2 <- unique(waumau2)
}
""")
ro.r("""
save(waumau2, daudate, file="/tmp/fxlove.Rdata")
write.csv(waumau2,file="/tmp/fxlovewaumau2.csv",row.names=FALSE)
write.csv(daudate,file="/tmp/fxlovedaudate.csv",row.names=FALSE)
system("aws s3 cp /tmp/fxlove.Rdata s3://mozilla-metrics/sguha/")
system("aws s3 cp /tmp/fxlovewaumau2.csv s3://mozilla-metrics/sguha/")
system("aws s3 cp /tmp/fxlovedaudate.csv s3://mozilla-metrics/sguha/")
""")
fxlove = { 'fromDate' : datetime.datetime.strftime(currentDate, "%Y-%m-%d") }
with open('/tmp/fxlove.json', 'w') as outfile:
json.dump(fxlove, outfile)
subprocess.call(["aws","s3","cp","/tmp/fxlove.json","s3://mozilla-metrics/sguha/"])
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment