Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created October 6, 2016 18:08
Show Gist options
  • Select an option

  • Save saptarshiguha/8c38f68e806f82016d32679354072c41 to your computer and use it in GitHub Desktop.

Select an option

Save saptarshiguha/8c38f68e806f82016d32679354072c41 to your computer and use it in GitHub Desktop.

Generate Queries using REDASH

  1. Compare average WAU counts across months of June,July, August for Firefox desktop. These queries need to be run on https://sql.telemetry.mozilla.org and then downloaded as CSV files.

    WITH sample AS ( SELECT * FROM client_count WHERE APP_NAME='Firefox' AND activity_date >= '2016-05-01' AND activity_date < '2016-09-01'), dau AS ( SELECT activity_date, merge(cast(hll AS HLL)) AS hll FROM sample GROUP BY activity_date ), wau7 AS ( SELECT activity_date, merge(hll) OVER (ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll FROM dau), mau28 AS ( SELECT activity_date, merge(hll) OVER (ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll FROM dau) SELECT dau.activity_date, cardinality(mau28.hll) AS mau28, cardinality(wau7.hll) AS wau7, cardinality(dau.hll) AS dau FROM mau28, wau7, dau WHERE mau28.activity_date = dau.activity_date AND dau.activity_date = wau7.activity_date

  2. Compare average WAU counts across months of June,July, August for Fir15ox, for channels (esp non release)

    WITH sample AS ( SELECT * FROM client_count WHERE APP_NAME='Firefox' AND activity_date > '2016-05-01' AND activity_date < '2016-09-01'), dau AS ( SELECT normalized_channel, activity_date, merge(cast(hll AS HLL)) AS hll FROM sample GROUP BY activity_date, normalized_channel ), wau7 AS ( SELECT normalized_channel, activity_date, merge(hll) OVER (PARTITION BY normalized_channel ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll FROM dau), mau28 AS ( SELECT normalized_channel, activity_date, merge(hll) OVER (PARTITION BY normalized_channel ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll FROM dau) SELECT dau.normalized_channel, dau.activity_date, cardinality(mau28.hll) AS mau28, cardinality(wau7.hll) AS wau7, cardinality(dau.hll) AS dau FROM mau28, wau7, dau WHERE mau28.normalized_channel = dau.normalized_channel AND mau28.activity_date = dau.activity_date AND dau.normalized_channel = wau7.normalized_channel AND dau.activity_date = wau7.activity_date

  3. Compare average WAU counts across months of June,July, August for Firefox, for countries

    WITH sample AS ( SELECT * FROM client_count WHERE APP_NAME='Firefox' AND activity_date > '2016-05-01' AND activity_date < '2016-09-01'), dau AS ( SELECT country, activity_date, merge(cast(hll AS HLL)) AS hll FROM sample GROUP BY activity_date, country ), wau7 AS ( SELECT country, activity_date, merge(hll) OVER (PARTITION BY country ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll FROM dau), mau28 AS ( SELECT country, activity_date, merge(hll) OVER (PARTITION BY country ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll FROM dau) SELECT dau.country, dau.activity_date, cardinality(mau28.hll) AS mau28, cardinality(wau7.hll) AS wau7, cardinality(dau.hll) AS dau FROM mau28, wau7, dau WHERE mau28.country = dau.country AND mau28.activity_date = dau.activity_date AND dau.country = wau7.country AND dau.activity_date = wau7.activity_date

Now read these files into R,

library(data.table)
setwd("~/gdrive/mz/hll_testing")
hllall <- fread("./hll_1_all_wau_dau_mau.csv")
hllch <- fread("./hll_2_channel_wau_dau_mau.csv")
hllctry <- fread("hlll_3_country_wau_dau_mau.csv")
hllall$type <- "hll"
hllch$type <- "hll"
hllctry$type <- "hll"
setnames(hllall, c("date","mau28","wau7","dau","type")); hllall$date <- as.Date(hllall$date)
setnames(hllch, c("channel","date","mau28","wau7","dau","type")); hllch$date <- as.Date(hllch$date)
setnames(hllctry, c("geo","date","mau28","wau7","dau","type")); hllctry$date <- as.Date(hllctry$date)

Generate Same queries using the Main Summary

import pyspark.sql.functions as fun
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import BooleanType ,StringType,ArrayType
from operator import add
import random
import time
import json
from subprocess import call
from datetime import date, timedelta, datetime

## concerned about two months: July And August
ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3",'parquet')
u1 = ms.filter(ms.app_name=='Firefox').select("client_id", "country",ms.normalized_channel.alias("channel"),
                                              ms.subsession_start_date.substr(1,10).alias("date"))
sqlContext.registerDataFrameAsTable(u1,"U")

####################################
### DAU BY COUNTRY, AND CHANNEL
####################################
START = '2016-07-01'
END = '2016-09-01'
dau = sqlContext.sql("""
select date, country,channel,count(distinct(client_id)) as dau
from U
where date >= '{}' and date <= '{}'
group by date,country, channel
""".format(START,END))
dau = dau.collect()

with open('/tmp/hllMS_dau.json', 'w') as outfile:
    json.dump(dau, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_dau.json","s3://mozilla-metrics/user/sguha/tmp/"])

####################################
### WAU BY COUNTRY AND CHANNEL
### Require A Loop
####################################
START       = '2016-06-21'
END         = '2016-09-01'
dayOfWeek = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
waus = {}
for x in dayOfWeek:
    u2 = u1.select("*",fun.date_format(fun.next_day(u1.date,x),"yyyy-MM-dd").alias("nextclosest"))
    sqlContext.registerDataFrameAsTable(u2,"U2")
    wau = sqlContext.sql("""
    select nextclosest as date, country,channel,count(distinct(client_id)) as wau
    from U2
    where date >= '{}' and date <= '{}'
    group by nextclosest,country, channel
    """.format(START,END))
    waus[x] = wau.collect()
    print(x)

with open('/tmp/hllMS_wau.json', 'w') as outfile:
    json.dump(waus, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_wau.json","s3://mozilla-metrics/user/sguha/tmp/"])

#######################################
## MAU
#######################################
START       = '2016-07-01'
END         = '2016-09-01'
DELTA       = datetime.strptime(END, '%Y-%m-%d')  - datetime.strptime(START,"%Y-%m-%d")
import random
RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)]
RangeOfDays = random.sample(RangeOfDays,31)
maus = []

for i,x in enumerate(RangeOfDays):
    eDate = date.strftime(x,"%Y-%m-%d")
    sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d")
    mau = sqlContext.sql("""
    select '{}', country,channel,count(distinct(client_id)) as wau
    from U
    where date >= '{}' and date <='{}'
    group by country, channel
    """.format(eDate, sDate,eDate))
    print("Working on {}/{}".format(i,len(RangeOfDays)))
    maus.append( mau.collect() )

with open('/tmp/hllMS_mau.json', 'w') as outfile:
    json.dump(maus, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_mau.json","s3://mozilla-metrics/user/sguha/tmp/"])

Once again we download the above data sets and combine them into another R data set that looks similar to the one above made for HLL.

library(rjson)
L <- lapply; U <- unlist; isn <- function(s,subcode=NA) if(is.null(s) || length(s)==0) subcode else s

system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_dau.json ./")
p <- fromJSON(paste(readLines("hllMS_dau.json"),collapse="\n"))
msdau <- data.table(date = as.Date(U(L(p, "[[",1))), geo=U(L(p,"[[",2)), channel=U(L(p,"[[",3)), dau=U(L(p,"[[",4)))
#]]]]]]]]
system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_mau.json ./")
p <- fromJSON(paste(readLines("hllMS_mau.json"),collapse="\n"))
msmau <- rbindlist(lapply(p,function(u){
    data.table(date = as.Date(U(L(u, "[[",1))), geo=U(L(u,"[[",2)), channel=U(L(u,"[[",3)), mau28=U(L(u,"[[",4)))
}))
#]]]]]]]]
system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_wau.json ./")
p <- fromJSON(paste(readLines("hllMS_wau.json"),collapse="\n"))
mswau <- rbindlist(lapply(p,function(u){
    data.table(date = as.Date(U(L(u, function(s) isn(s[[1]]))))
             , geo=U(L(u,"[[",2)) , channel=U(L(u,"[[",3)) , wau7=U(L(u,"[[",4)))
}))[!is.na(date),]
#]]]]]]
mseveyrthing <- merge(merge(msmau,mswau, by=c("date","geo","channel"),all.y=TRUE),msdau,by=c("date","geo","channel"),all.y=TRUE)
mseveyrthing$type <- "ms"
msall <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("date","type")]
msctry <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("geo","date","type")]
mschannel <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("channel","date","type")]

Generate Queries Using Longitudinal

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
sqlContext = pyspark.sql.SQLContext(sc)


#import mozillametricstools.common.functions as cf
import pyspark.sql.functions as fun
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import BooleanType ,StringType,ArrayType
from operator import add
from datetime import date, datetime,timedelta
import pandas as pd
import json


START       = '2016-07-01'
END         = '2016-09-01'
DELTA       = datetime.strptime(END, '%Y-%m-%d')  - datetime.strptime(START,"%Y-%m-%d")
RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)]
RangeOfDays = [ date.strftime(x, "%Y-%m-%d") for x in RangeOfDays]

def m1(a):
    ssd = [x[0:10] for x in a.subsession_start_date]
    keyset = {}
    channel = a.normalized_channel
    ctry = a.geo_country[0]

    # if len(a.build)==0 or a.build[0].application_name !="Firefox":
    #      yield None,None
    for aday in RangeOfDays:
        if aday in ssd:
            dau = 1*100
        else:
            dau = 0
        s =  date.strftime(datetime.strptime(aday,"%Y-%m-%d")-timedelta(days=6) ,"%Y-%m-%d")
        wau = any( [a for a in ssd if a <= aday  and a>= s ] )*1*100
        s =  date.strftime(datetime.strptime(aday,"%Y-%m-%d")-timedelta(days=27) ,"%Y-%m-%d")
        mau = any( [a for a in ssd if a <= aday  and a>= s ] )*1*100
        keyset[ (aday, channel,ctry) ] = (mau,wau,dau)
    for k,v in keyset.iteritems():
        yield k,v


def reduce_func(x, y):
    return (x[0] + y[0], #mau
            x[1] + y[1], #wau
            x[2] + y[2])  #day

frame =  sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20161004/','parquet')
frame = frame.filter(frame.build[0].application_name == "Firefox")
object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()


with open('/home/hadoop/x') as json_data:
    d = json.load(json_data)
    print(d)

import sys
for i,x in enumerate(d):
    frame =  sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20160925/{}'.format(x),'parquet')
    frame = frame.filter(frame.build[0].application_name == "Firefox")
    print('{}/{} : s3://telemetry-parquet/longitudinal/v20160925/{}'.format(i,len(d),x))
    sys.stdout.flush()
    object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()




object2 = [{'date':x[0][0], 'channel':x[0][1], 'geo':x[0][2], 'mau28':x[1][0],'wau7':x[1][1], 'dau':x[1][2]} for x  in object]
objdf = pd.read_json(json.dumps(object2))
feather.write_dataframe(objdf, "/tmp/onepct.fthr")
system("aws s3 cp /tmp/onepct.fthr s3://mozilla-metrics/user/sguha/tmp/")

And load this data into R,

library(feather)
system("aws s3 cp  s3://mozilla-metrics/user/sguha/tmp/onepct.fthr /tmp/")
onepct <- data.table(read_feather("/tmp/onepct.fthr"))
onepct$date <- as.Date(onepct$date)
onepct$type <- "1pct"
oneall <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("date","type")]
onectry <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("geo","date","type")]
onechannel <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("channel","date","type")]

oneall;hllall;msall;
onechannel;hllch;mschannel;
onectry;hllctry;msctry

oneall1 <- oneall[,list(date,mau281=mau28, wau71=wau7, dau1=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]
msall1 <- msall[,list(date, mau28ms=mau28, wau7ms=wau7,daums=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]
hllall1 <- hllall[,list(date, mau28hll=mau28, wau7hll=wau7,dauhll=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]
@saptarshiguha
Copy link
Author

# Generate Queries using REDASH 1. Compare average WAU counts across months of June,July, August for Firefox desktop. These queries need to be run on https://sql.telemetry.mozilla.org and then downloaded as CSV files.

WITH sample AS
( SELECT *
FROM client_count
WHERE APP_NAME='Firefox' AND activity_date >= '2016-05-01'
AND activity_date < '2016-09-01'),
dau AS
( SELECT
activity_date,
merge(cast(hll AS HLL)) AS hll
FROM sample
GROUP BY activity_date
),
wau7 AS
( SELECT activity_date,
merge(hll) OVER (ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau),
mau28 AS
( SELECT activity_date,
merge(hll) OVER (ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau)
SELECT dau.activity_date,
cardinality(mau28.hll) AS mau28,
cardinality(wau7.hll) AS wau7,
cardinality(dau.hll) AS dau
FROM mau28,
wau7,
dau
WHERE mau28.activity_date = dau.activity_date
AND dau.activity_date = wau7.activity_date
2. Compare average WAU counts across months of June,July, August for Fir15ox,
for channels (esp non release)

WITH sample AS
( SELECT *
FROM client_count
WHERE APP_NAME='Firefox' AND activity_date > '2016-05-01'
AND activity_date < '2016-09-01'),
dau AS
( SELECT normalized_channel,
activity_date,
merge(cast(hll AS HLL)) AS hll
FROM sample
GROUP BY activity_date,
normalized_channel
),
wau7 AS
( SELECT normalized_channel,
activity_date,
merge(hll) OVER (PARTITION BY normalized_channel
ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau),
mau28 AS
( SELECT normalized_channel,
activity_date,
merge(hll) OVER (PARTITION BY normalized_channel
ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau)
SELECT dau.normalized_channel,
dau.activity_date,
cardinality(mau28.hll) AS mau28,
cardinality(wau7.hll) AS wau7,
cardinality(dau.hll) AS dau
FROM mau28,
wau7,
dau
WHERE mau28.normalized_channel = dau.normalized_channel
AND mau28.activity_date = dau.activity_date
AND dau.normalized_channel = wau7.normalized_channel
AND dau.activity_date = wau7.activity_date
3. Compare average WAU counts across months of June,July, August for Firefox,
for countries

WITH sample AS
( SELECT *
FROM client_count
WHERE APP_NAME='Firefox' AND activity_date > '2016-05-01'
AND activity_date < '2016-09-01'),
dau AS
( SELECT country,
activity_date,
merge(cast(hll AS HLL)) AS hll
FROM sample
GROUP BY activity_date,
country
),
wau7 AS
( SELECT country,
activity_date,
merge(hll) OVER (PARTITION BY country
ORDER BY activity_date ROWS BETWEEN 6 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau),
mau28 AS
( SELECT country,
activity_date,
merge(hll) OVER (PARTITION BY country
ORDER BY activity_date ROWS BETWEEN 27 PRECEDING AND 0 FOLLOWING) AS hll
FROM dau)
SELECT dau.country,
dau.activity_date,
cardinality(mau28.hll) AS mau28,
cardinality(wau7.hll) AS wau7,
cardinality(dau.hll) AS dau
FROM mau28,
wau7,
dau
WHERE mau28.country = dau.country
AND mau28.activity_date = dau.activity_date
AND dau.country = wau7.country
AND dau.activity_date = wau7.activity_date

Now read these files into R,

library(data.table)
setwd("~/gdrive/mz/hll_testing")
hllall <- fread("./hll_1_all_wau_dau_mau.csv")
hllch <- fread("./hll_2_channel_wau_dau_mau.csv")
hllctry <- fread("hlll_3_country_wau_dau_mau.csv")
hllall$type <- "hll"
hllch$type <- "hll"
hllctry$type <- "hll"
setnames(hllall, c("date","mau28","wau7","dau","type")); hllall$date <- as.Date(hllall$date)
setnames(hllch, c("channel","date","mau28","wau7","dau","type")); hllch$date <- as.Date(hllch$date)
setnames(hllctry, c("geo","date","mau28","wau7","dau","type")); hllctry$date <- as.Date(hllctry$date)

Generate Same queries using the Main Summary

import pyspark.sql.functions as fun
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import BooleanType ,StringType,ArrayType
from operator import add
import random
import time
import json
from subprocess import call
from datetime import date, timedelta, datetime

## concerned about two months: July And August
ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3",'parquet')
u1 = ms.filter(ms.app_name=='Firefox').select("client_id", "country",ms.normalized_channel.alias("channel"),
                                              ms.subsession_start_date.substr(1,10).alias("date"))
sqlContext.registerDataFrameAsTable(u1,"U")

####################################
### DAU BY COUNTRY, AND CHANNEL
####################################
START = '2016-07-01'
END = '2016-09-01'
dau = sqlContext.sql("""
select date, country,channel,count(distinct(client_id)) as dau
from U
where date >= '{}' and date <= '{}'
group by date,country, channel
""".format(START,END))
dau = dau.collect()

with open('/tmp/hllMS_dau.json', 'w') as outfile:
    json.dump(dau, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_dau.json","s3://mozilla-metrics/user/sguha/tmp/"])

####################################
### WAU BY COUNTRY AND CHANNEL
### Require A Loop
####################################
START       = '2016-06-21'
END         = '2016-09-01'
dayOfWeek = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
waus = {}
for x in dayOfWeek:
    u2 = u1.select("*",fun.date_format(fun.next_day(u1.date,x),"yyyy-MM-dd").alias("nextclosest"))
    sqlContext.registerDataFrameAsTable(u2,"U2")
    wau = sqlContext.sql("""
    select nextclosest as date, country,channel,count(distinct(client_id)) as wau
    from U2
    where date >= '{}' and date <= '{}'
    group by nextclosest,country, channel
    """.format(START,END))
    waus[x] = wau.collect()
    print(x)

with open('/tmp/hllMS_wau.json', 'w') as outfile:
    json.dump(waus, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_wau.json","s3://mozilla-metrics/user/sguha/tmp/"])

#######################################
## MAU
#######################################
START       = '2016-07-01'
END         = '2016-09-01'
DELTA       = datetime.strptime(END, '%Y-%m-%d')  - datetime.strptime(START,"%Y-%m-%d")
import random
RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)]
RangeOfDays = random.sample(RangeOfDays,31)
maus = []

for i,x in enumerate(RangeOfDays):
    eDate = date.strftime(x,"%Y-%m-%d")
    sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d")
    mau = sqlContext.sql("""
    select '{}', country,channel,count(distinct(client_id)) as wau
    from U
    where date >= '{}' and date <='{}'
    group by country, channel
    """.format(eDate, sDate,eDate))
    print("Working on {}/{}".format(i,len(RangeOfDays)))
    maus.append( mau.collect() )

with open('/tmp/hllMS_mau.json', 'w') as outfile:
    json.dump(maus, outfile)

call(["aws","s3", "cp", "/tmp/hllMS_mau.json","s3://mozilla-metrics/user/sguha/tmp/"])

Once again we download the above data sets and combine them into another R data
set that looks similar to the one above made for HLL.

library(rjson)
L <- lapply; U <- unlist; isn <- function(s,subcode=NA) if(is.null(s) || length(s)==0) subcode else s

system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_dau.json ./")
p <- fromJSON(paste(readLines("hllMS_dau.json"),collapse="\n"))
msdau <- data.table(date = as.Date(U(L(p, "[[",1))), geo=U(L(p,"[[",2)), channel=U(L(p,"[[",3)), dau=U(L(p,"[[",4)))
#]]]]]]]]
system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_mau.json ./")
p <- fromJSON(paste(readLines("hllMS_mau.json"),collapse="\n"))
msmau <- rbindlist(lapply(p,function(u){
    data.table(date = as.Date(U(L(u, "[[",1))), geo=U(L(u,"[[",2)), channel=U(L(u,"[[",3)), mau28=U(L(u,"[[",4)))
}))
#]]]]]]]]
system("aws s3 cp s3://mozilla-metrics/user/sguha/tmp/hllMS_wau.json ./")
p <- fromJSON(paste(readLines("hllMS_wau.json"),collapse="\n"))
mswau <- rbindlist(lapply(p,function(u){
    data.table(date = as.Date(U(L(u, function(s) isn(s[[1]]))))
             , geo=U(L(u,"[[",2)) , channel=U(L(u,"[[",3)) , wau7=U(L(u,"[[",4)))
}))[!is.na(date),]
#]]]]]]
mseveyrthing <- merge(merge(msmau,mswau, by=c("date","geo","channel"),all.y=TRUE),msdau,by=c("date","geo","channel"),all.y=TRUE)
mseveyrthing$type <- "ms"
msall <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("date","type")]
msctry <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("geo","date","type")]
mschannel <- mseveyrthing[, list( mau28=sum(mau28,na.rm=TRUE), wau7=sum(wau7,na.rm=TRUE),dau=sum(dau)),by=c("channel","date","type")]

Generate Queries Using Longitudinal

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
sqlContext = pyspark.sql.SQLContext(sc)


#import mozillametricstools.common.functions as cf
import pyspark.sql.functions as fun
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import BooleanType ,StringType,ArrayType
from operator import add
from datetime import date, datetime,timedelta
import pandas as pd
import json


START       = '2016-07-01'
END         = '2016-09-01'
DELTA       = datetime.strptime(END, '%Y-%m-%d')  - datetime.strptime(START,"%Y-%m-%d")
RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)]
RangeOfDays = [ date.strftime(x, "%Y-%m-%d") for x in RangeOfDays]

def m1(a):
    ssd = [x[0:10] for x in a.subsession_start_date]
    keyset = {}
    channel = a.normalized_channel
    ctry = a.geo_country[0]

    # if len(a.build)==0 or a.build[0].application_name !="Firefox":
    #      yield None,None
    for aday in RangeOfDays:
        if aday in ssd:
            dau = 1*100
        else:
            dau = 0
        s =  date.strftime(datetime.strptime(aday,"%Y-%m-%d")-timedelta(days=6) ,"%Y-%m-%d")
        wau = any( [a for a in ssd if a <= aday  and a>= s ] )*1*100
        s =  date.strftime(datetime.strptime(aday,"%Y-%m-%d")-timedelta(days=27) ,"%Y-%m-%d")
        mau = any( [a for a in ssd if a <= aday  and a>= s ] )*1*100
        keyset[ (aday, channel,ctry) ] = (mau,wau,dau)
    for k,v in keyset.iteritems():
        yield k,v


def reduce_func(x, y):
    return (x[0] + y[0], #mau
            x[1] + y[1], #wau
            x[2] + y[2])  #day

frame =  sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20161004/','parquet')
frame = frame.filter(frame.build[0].application_name == "Firefox")
object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()


with open('/home/hadoop/x') as json_data:
    d = json.load(json_data)
    print(d)

import sys
for i,x in enumerate(d):
    frame =  sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20160925/{}'.format(x),'parquet')
    frame = frame.filter(frame.build[0].application_name == "Firefox")
    print('{}/{} : s3://telemetry-parquet/longitudinal/v20160925/{}'.format(i,len(d),x))
    sys.stdout.flush()
    object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()




object2 = [{'date':x[0][0], 'channel':x[0][1], 'geo':x[0][2], 'mau28':x[1][0],'wau7':x[1][1], 'dau':x[1][2]} for x  in object]
objdf = pd.read_json(json.dumps(object2))
feather.write_dataframe(objdf, "/tmp/onepct.fthr")
system("aws s3 cp /tmp/onepct.fthr s3://mozilla-metrics/user/sguha/tmp/")

And load this data into R,

library(feather)
system("aws s3 cp  s3://mozilla-metrics/user/sguha/tmp/onepct.fthr /tmp/")
onepct <- data.table(read_feather("/tmp/onepct.fthr"))
onepct$date <- as.Date(onepct$date)
onepct$type <- "1pct"
oneall <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("date","type")]
onectry <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("geo","date","type")]
onechannel <- onepct[, list( mau28=sum(mau28), wau7=sum(wau7),dau=sum(dau)),by=c("channel","date","type")]

oneall;hllall;msall;
onechannel;hllch;mschannel;
onectry;hllctry;msctry

oneall1 <- oneall[,list(date,mau281=mau28, wau71=wau7, dau1=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]
msall1 <- msall[,list(date, mau28ms=mau28, wau7ms=wau7,daums=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]
hllall1 <- hllall[,list(date, mau28hll=mau28, wau7hll=wau7,dauhll=dau)][order(date),][!is.na(date) & date %between% c("2016-07-01","2016-08-31"),]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment