Created
October 6, 2016 17:54
-
-
Save saptarshiguha/3aaf5ab58e5a6f2c007710a1a0f3b2cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "#import mozillametricstools.common.functions as cf\n", | |
| "import pyspark.sql.functions as fun\n", | |
| "from pyspark.sql.window import Window\n", | |
| "from pyspark.sql import Row\n", | |
| "from pyspark.sql.types import BooleanType ,StringType,ArrayType\n", | |
| "from operator import add\n", | |
| "from datetime import date, datetime,timedelta\n", | |
| "import pandas as pd\n", | |
| "import json\n", | |
| "\n", | |
| "\n", | |
| "START = '2016-07-01'\n", | |
| "END = '2016-09-01'\n", | |
| "DELTA = datetime.strptime(END, '%Y-%m-%d') - datetime.strptime(START,\"%Y-%m-%d\")\n", | |
| "RangeOfDays = [ datetime.strptime(START,\"%Y-%m-%d\")+timedelta(days=x) for x in range(DELTA.days)]\n", | |
| "RangeOfDays = [ date.strftime(x, \"%Y-%m-%d\") for x in RangeOfDays]\n", | |
| "\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "def m1(a):\n", | |
| " ssd = [x[0:10] for x in a.subsession_start_date]\n", | |
| " keyset = {}\n", | |
| " channel = a.normalized_channel\n", | |
| " ctry = a.geo_country[0]\n", | |
| "\n", | |
| " # if len(a.build)==0 or a.build[0].application_name !=\"Firefox\":\n", | |
| " # yield None,None\n", | |
| " for aday in RangeOfDays:\n", | |
| " if aday in ssd:\n", | |
| " dau = 1*100\n", | |
| " else:\n", | |
| " dau = 0\n", | |
| " s = date.strftime(datetime.strptime(aday,\"%Y-%m-%d\")-timedelta(days=6) ,\"%Y-%m-%d\")\n", | |
| " wau = any( [a for a in ssd if a <= aday and a>= s ] )*1*100\n", | |
| " s = date.strftime(datetime.strptime(aday,\"%Y-%m-%d\")-timedelta(days=27) ,\"%Y-%m-%d\")\n", | |
| " mau = any( [a for a in ssd if a <= aday and a>= s ] )*1*100\n", | |
| " keyset[ (aday, channel,ctry) ] = (mau,wau,dau)\n", | |
| " for k,v in keyset.iteritems():\n", | |
| " yield k,v\n", | |
| "\n", | |
| "\n", | |
| "def reduce_func(x, y):\n", | |
| " return (x[0] + y[0], #mau\n", | |
| " x[1] + y[1], #wau\n", | |
| " x[2] + y[2]) #day\n", | |
| " \n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "frame = sqlContext.read.load('s3://telemetry-parquet/longitudinal/v20160807/','parquet')\n", | |
| "frame = frame.filter(frame.build[0].application_name == \"Firefox\")\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "ename": "Py4JJavaError", | |
| "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 402 in stage 2.0 failed 4 times, most recent failure: Lost task 402.3 in stage 2.0 (TID 2287, ip-172-31-6-145.us-west-2.compute.internal): ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Slave lost\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n", | |
| "output_type": "error", | |
| "traceback": [ | |
| "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", | |
| "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", | |
| "\u001b[1;32m<ipython-input-5-78cd2f103d17>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mobject\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mframe\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mflatMap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mm1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreduceByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mreduce_func\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", | |
| "\u001b[1;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 769\u001b[0m \"\"\"\n\u001b[0;32m 770\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 771\u001b[1;33m \u001b[0mport\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 772\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mport\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 773\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", | |
| "\u001b[1;32m/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 811\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 812\u001b[0m return_value = get_return_value(\n\u001b[1;32m--> 813\u001b[1;33m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[0;32m 814\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 815\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", | |
| "\u001b[1;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[1;34m(*a, **kw)\u001b[0m\n\u001b[0;32m 43\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 44\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 45\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 46\u001b[0m \u001b[1;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 47\u001b[0m \u001b[0ms\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0me\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", | |
| "\u001b[1;32m/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 306\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 307\u001b[0m \u001b[1;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 308\u001b[1;33m format(target_id, \".\", name), value)\n\u001b[0m\u001b[0;32m 309\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 310\u001b[0m raise Py4JError(\n", | |
| "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 402 in stage 2.0 failed 4 times, most recent failure: Lost task 402.3 in stage 2.0 (TID 2287, ip-172-31-6-145.us-west-2.compute.internal): ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Slave lost\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "object = frame.rdd.flatMap(m1).reduceByKey(reduce_func).collect()\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 2", | |
| "language": "python", | |
| "name": "python2" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 2 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython2", | |
| "version": "2.7.11" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment