Created
October 24, 2016 23:34
-
-
Save saptarshiguha/3110f4b03258d21996f32084444d5351 to your computer and use it in GitHub Desktop.
Computing counts from main_summary
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import findspark | |
| findspark.init() | |
| import pyspark | |
| sc = pyspark.SparkContext(appName="myAppName") | |
| sqlContext = pyspark.sql.SQLContext(sc) | |
| import pyspark.sql.functions as fun | |
| from pyspark.sql.window import Window | |
| from pyspark.sql import Row | |
| from operator import add | |
| import subprocess | |
| from datetime import date, datetime,timedelta | |
| import pandas as pd | |
| import json | |
| import feather | |
| import random | |
| random.seed(100) | |
| START = '2016-08-01' | |
| END = '2016-09-01' | |
| DELTA = datetime.strptime(END, '%Y-%m-%d') - datetime.strptime(START,"%Y-%m-%d") | |
| RangeOfDays = [ datetime.strptime(START,"%Y-%m-%d")+timedelta(days=x) for x in range(DELTA.days)] | |
| RangeOfDays = random.sample(RangeOfDays,10) | |
| ms = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3",'parquet') | |
| u1 = ms.filter(ms.app_name=='Firefox').select("client_id", "country",ms.normalized_channel.alias("channel"), | |
| ms.subsession_start_date.substr(1,10).alias("date")) | |
| from pyspark import storagelevel | |
| ##u2=u1.persist(storagelevel.StorageLevel.MEMORY_AND_DISK_SER) | |
| sqlContext.registerDataFrameAsTable(u1,"U2") | |
| ###################################################################### | |
| ## Since profiles can belong to multiple countries | |
| ## it needs to be done separatly | |
| ###################################################################### | |
| ###################################################################### | |
| # MAU | |
| ###################################################################### | |
| maus = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d") | |
| mau = sqlContext.sql(""" | |
| select '{}', count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| maus.append( mau.collect() ) | |
| mausPD=pd.concat([ pd.DataFrame(x) for x in maus]) | |
| feather.write_dataframe(mausPD, "/tmp/mausPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/mausPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| mausCountry = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d") | |
| x = sqlContext.sql(""" | |
| select '{}', country,count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| group by country | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| mausCountry.append( x.collect() ) | |
| mausCountryPD=pd.concat([ pd.DataFrame(x) for x in mausCountry]) | |
| feather.write_dataframe(mausCountryPD, "/tmp/mausCountryPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/mausCountryPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| mauChannel = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=27),"%Y-%m-%d") | |
| x = sqlContext.sql(""" | |
| select '{}', channel,count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| group by channel | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| mauChannel.append( x.collect() ) | |
| mauChannelPD=pd.concat([ pd.DataFrame(x) for x in mauChannel]) | |
| feather.write_dataframe(mauChannelPD, "/tmp/mauChannelPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/mauChannelPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| ###################################################################### | |
| ## DAU | |
| ###################################################################### | |
| START = '2016-08-01' | |
| END = '2016-09-01' | |
| dau = sqlContext.sql(""" | |
| select date, count(distinct(client_id)) as dau | |
| from U2 | |
| where date >= '{}' and date <= '{}' | |
| group by date | |
| """.format(START,END)) | |
| dau = dau.collect() | |
| dauCountry = sqlContext.sql(""" | |
| select date, country,count(distinct(client_id)) as dau | |
| from U2 | |
| where date >= '{}' and date <= '{}' | |
| group by date,country | |
| """.format(START,END)).collect() | |
| feather.write_dataframe(pd.DataFrame(dauCountry), "/tmp/dauCountry.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/dauCountry.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| dauChannel = sqlContext.sql(""" | |
| select date, channel,count(distinct(client_id)) as dau | |
| from U2 | |
| where date >= '{}' and date <= '{}' | |
| group by date,channel | |
| """.format(START,END)).collect() | |
| feather.write_dataframe(pd.DataFrame(dauChannel), "/tmp/dauChannel.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/dauChannel.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| feather.write_dataframe(pd.DataFrame(dau), "/tmp/dau.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/dau.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| ###################################################################### | |
| ## WAU | |
| ###################################################################### | |
| waus = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d") | |
| x = sqlContext.sql(""" | |
| select '{}', count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| waus.append( x.collect() ) | |
| wausPD=pd.concat([ pd.DataFrame(x) for x in waus]) | |
| feather.write_dataframe(wausPD, "/tmp/wausPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/wausPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| wausCountry = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d") | |
| x = sqlContext.sql(""" | |
| select '{}', country,count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| group by country | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| wausCountry.append( x.collect() ) | |
| wausCountryPD=pd.concat([ pd.DataFrame(x) for x in wausCountry]) | |
| feather.write_dataframe(wausCountryPD, "/tmp/wausCountryPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/wausCountryPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
| wauChannel = [] | |
| for i,x in enumerate(RangeOfDays): | |
| eDate = date.strftime(x,"%Y-%m-%d") | |
| sDate = date.strftime(x - timedelta(days=6),"%Y-%m-%d") | |
| x = sqlContext.sql(""" | |
| select '{}', channel,count(distinct(client_id)) as mau | |
| from U2 | |
| where date >= '{}' and date <='{}' | |
| group by channel | |
| """.format(eDate, sDate,eDate)) | |
| print("Working on {}/{}".format(i,len(RangeOfDays))) | |
| wauChannel.append( x.collect() ) | |
| wauChannelPD=pd.concat([ pd.DataFrame(x) for x in wauChannel]) | |
| feather.write_dataframe(wauChannelPD, "/tmp/wauChannelPD.fthr") | |
| subprocess.call(["aws","s3", "cp", "/tmp/wauChannelPD.fthr","s3://mozilla-metrics/user/sguha/tmp/"]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment