Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created October 24, 2016 23:34
Show Gist options
  • Select an option

  • Save saptarshiguha/3110f4b03258d21996f32084444d5351 to your computer and use it in GitHub Desktop.

Select an option

Save saptarshiguha/3110f4b03258d21996f32084444d5351 to your computer and use it in GitHub Desktop.
Computing counts from main_summary
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
sqlContext = pyspark.sql.SQLContext(sc)
import pyspark.sql.functions as fun
from pyspark.sql.window import Window
from pyspark.sql import Row
from operator import add
import subprocess
from datetime import date, datetime,timedelta
import pandas as pd
import json
import feather
import random
random.seed(100)
START = '2016-08-01'
END = '2016-09-01'
DELTA = datetime.strptime(END, '%Y-%m-%d') - datetime.strptime(START,"%Y-%m-%d")
RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)]
RangeOfDays = random.sample(RangeOfDays,10)
ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3",'parquet')
u1 = ms.filter(ms.app_name=='Firefox').select("client_id", "country",ms.normalized_channel.alias("channel"),
ms.subsession_start_date.substr(1,10).alias("date"))
from pyspark import storagelevel
##u2=u1.persist(storagelevel.StorageLevel.MEMORY_AND_DISK_SER)
sqlContext.registerDataFrameAsTable(u1,"U2")
######################################################################
## Since profiles can belong to multiple countries
## it needs to be done separatly
######################################################################
######################################################################
# MAU
######################################################################
maus = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d")
mau = sqlContext.sql("""
select '{}', count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
maus.append( mau.collect() )
mausPD=pd.concat([ pd.DataFrame(x) for x in maus])
feather.write_dataframe(mausPD, "/tmp/mausPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/mausPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
mausCountry = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d")
x = sqlContext.sql("""
select '{}', country,count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
group by country
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
mausCountry.append( x.collect() )
mausCountryPD=pd.concat([ pd.DataFrame(x) for x in mausCountry])
feather.write_dataframe(mausCountryPD, "/tmp/mausCountryPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/mausCountryPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
mauChannel = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d")
x = sqlContext.sql("""
select '{}', channel,count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
group by channel
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
mauChannel.append( x.collect() )
mauChannelPD=pd.concat([ pd.DataFrame(x) for x in mauChannel])
feather.write_dataframe(mauChannelPD, "/tmp/mauChannelPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/mauChannelPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
######################################################################
## DAU
######################################################################
START = '2016-08-01'
END = '2016-09-01'
dau = sqlContext.sql("""
select date, count(distinct(client_id)) as dau
from U2
where date >= '{}' and date <= '{}'
group by date
""".format(START,END))
dau = dau.collect()
dauCountry = sqlContext.sql("""
select date, country,count(distinct(client_id)) as dau
from U2
where date >= '{}' and date <= '{}'
group by date,country
""".format(START,END)).collect()
feather.write_dataframe(pd.DataFrame(dauCountry), "/tmp/dauCountry.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/dauCountry.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
dauChannel = sqlContext.sql("""
select date, channel,count(distinct(client_id)) as dau
from U2
where date >= '{}' and date <= '{}'
group by date,channel
""".format(START,END)).collect()
feather.write_dataframe(pd.DataFrame(dauChannel), "/tmp/dauChannel.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/dauChannel.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
feather.write_dataframe(pd.DataFrame(dau), "/tmp/dau.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/dau.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
######################################################################
## WAU
######################################################################
waus = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d")
x = sqlContext.sql("""
select '{}', count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
waus.append( x.collect() )
wausPD=pd.concat([ pd.DataFrame(x) for x in waus])
feather.write_dataframe(wausPD, "/tmp/wausPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/wausPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
wausCountry = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d")
x = sqlContext.sql("""
select '{}', country,count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
group by country
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
wausCountry.append( x.collect() )
wausCountryPD=pd.concat([ pd.DataFrame(x) for x in wausCountry])
feather.write_dataframe(wausCountryPD, "/tmp/wausCountryPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/wausCountryPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
wauChannel = []
for i,x in enumerate(RangeOfDays):
eDate = date.strftime(x,"%Y-%m-%d")
sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d")
x = sqlContext.sql("""
select '{}', channel,count(distinct(client_id)) as mau
from U2
where date >= '{}' and date <='{}'
group by channel
""".format(eDate, sDate,eDate))
print("Working on {}/{}".format(i,len(RangeOfDays)))
wauChannel.append( x.collect() )
wauChannelPD=pd.concat([ pd.DataFrame(x) for x in wauChannel])
feather.write_dataframe(wauChannelPD, "/tmp/wauChannelPD.fthr")
subprocess.call(["aws","s3", "cp", "/tmp/wauChannelPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment