Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created October 6, 2016 17:33
Show Gist options
  • Select an option

  • Save saptarshiguha/562292117936d7358bbbbeb6f2502538 to your computer and use it in GitHub Desktop.

Select an option

Save saptarshiguha/562292117936d7358bbbbeb6f2502538 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pyspark.sql.functions as fun\n",
"from pyspark.sql.window import Window\n",
"from pyspark.sql import Row\n",
"from pyspark.sql.types import BooleanType ,StringType,ArrayType\n",
"from operator import add\n",
"from datetime import date, datetime,timedelta\n",
"import pandas as pd\n",
"import json\n",
"\n",
"frame = sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20161004/','parquet')\n",
"frame = frame.filter(frame.build[0].application_name == \"Firefox\")\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"\n",
"START = '2016-07-01'\n",
"END = '2016-09-01'\n",
"DELTA = datetime.strptime(END, '%Y-%m-%d') - datetime.strptime(START,\"%Y-%m-%d\")\n",
"RangeOfDays = [ datetime.strptime(START,\"%Y-%m-%d\")+timedelta(days=x) for x in range(DELTA.days)]\n",
"RangeOfDays = [ date.strftime(x, \"%Y-%m-%d\") for x in RangeOfDays]\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def m1(a):\n",
" ssd = [x[0:10] for x in a.subsession_start_date]\n",
" keyset = {}\n",
" channel = a.normalized_channel\n",
" ctry = a.geo_country[0]\n",
"\n",
" # if len(a.build)==0 or a.build[0].application_name !=\"Firefox\":\n",
" # yield None,None\n",
" for aday in RangeOfDays:\n",
" if aday in ssd:\n",
" dau = 1*100\n",
" else:\n",
" dau = 0\n",
" s = date.strftime(datetime.strptime(aday,\"%Y-%m-%d\")-timedelta(days=6) ,\"%Y-%m-%d\")\n",
" wau = any( [a for a in ssd if a <= aday and a>= s ] )*1*100\n",
" s = date.strftime(datetime.strptime(aday,\"%Y-%m-%d\")-timedelta(days=27) ,\"%Y-%m-%d\")\n",
" mau = any( [a for a in ssd if a <= aday and a>= s ] )*1*100\n",
" keyset[ (aday, channel,ctry) ] = (mau,wau,dau)\n",
" for k,v in keyset.iteritems():\n",
" yield k,v\n",
"\n",
"\n",
"def reduce_func(x, y):\n",
" return (x[0] + y[0], #mau\n",
" x[1] + y[1], #wau\n",
" x[2] + y[2]) #day\n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 241 in stage 1.0 failed 4 times, most recent failure: Lost task 241.3 in stage 1.0 (TID 1578, ip-172-31-7-145.us-west-2.compute.internal): ExecutorLostFailure (executor 49 exited caused by one of the running tasks) Reason: Container marked as failed: container_1475772235585_0001_01_002938 on host: ip-172-31-7-145.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137\nContainer exited with a non-zero exit code 137\nKilled by external signal\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"output_type": "error",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m<ipython-input-4-78cd2f103d17>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mobject\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mframe\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mflatMap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mm1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreduceByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mreduce_func\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[1;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 769\u001b[0m \"\"\"\n\u001b[0;32m 770\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 771\u001b[1;33m \u001b[0mport\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 772\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mport\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 773\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 811\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 812\u001b[0m return_value = get_return_value(\n\u001b[1;32m--> 813\u001b[1;33m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[0;32m 814\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 815\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m 43\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 44\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 45\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 46\u001b[0m \u001b[1;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 47\u001b[0m \u001b[0ms\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 306\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 307\u001b[0m \u001b[1;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 308\u001b[1;33m format(target_id, \".\", name), value)\n\u001b[0m\u001b[0;32m 309\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 310\u001b[0m raise Py4JError(\n",
"\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 241 in stage 1.0 failed 4 times, most recent failure: Lost task 241.3 in stage 1.0 (TID 1578, ip-172-31-7-145.us-west-2.compute.internal): ExecutorLostFailure (executor 49 exited caused by one of the running tasks) Reason: Container marked as failed: container_1475772235585_0001_01_002938 on host: ip-172-31-7-145.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137\nContainer exited with a non-zero exit code 137\nKilled by external signal\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n"
]
}
],
"source": [
"frame = sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20161004/','parquet')\n",
"frame = frame.filter(frame.build[0].application_name == \"Firefox\"\n",
"object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment