Skip to content

Instantly share code, notes, and snippets.

@fperez
Last active December 21, 2015 23:48
Show Gist options
  • Save fperez/6384491 to your computer and use it in GitHub Desktop.
Save fperez/6384491 to your computer and use it in GitHub Desktop.
HowTo for starting an IPython Notebook server
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "heading",
"level": 1,
"metadata": {},
"source": [
"Interactive Analysis with PySpark and IPython"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Credit:* This is an IPython-ized version of the [official AmpCamp 2013 Tutorial](http://ampcamp.berkeley.edu/big-data-mini-course/data-exploration-using-spark.html), all content credit goes to the AMPLab team.\n",
"\n",
"Let's start by creating our Spark context:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from pyspark import SparkContext\n",
"sc = SparkContext( CLUSTER_URL, 'pyspark')\n",
"sc"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 1,
"text": [
"<pyspark.context.SparkContext at 0x2dc4950>"
]
}
],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**1.** Warm up by creating an RDD (Resilient Distributed Dataset) named `pagecounts` from the input files, in this case 3 days of Wikipedia traffic:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"pagecounts = sc.textFile(\"/wiki/pagecounts\")\n",
"pagecounts"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 2,
"text": [
"<pyspark.rdd.RDD at 0x3015d10>"
]
}
],
"prompt_number": 2
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**2.** Let\u2019s take a peek at the data. You can use the take operation of an RDD to get the first K records. Here, K = 10."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"pagecounts.take(10)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 3,
"text": [
"[u'20090505-000000 aa.b ?71G4Bo1cAdWyg 1 14463',\n",
" u'20090505-000000 aa.b Special:Statistics 1 840',\n",
" u'20090505-000000 aa.b Special:Whatlinkshere/MediaWiki:Returnto 1 1019',\n",
" u'20090505-000000 aa.b Wikibooks:About 1 15719',\n",
" u'20090505-000000 aa ?14mFX1ildVnBc 1 13205',\n",
" u'20090505-000000 aa ?53A%2FuYP3FfnKM 1 13207',\n",
" u'20090505-000000 aa ?93HqrnFc%2EiqRU 1 13199',\n",
" u'20090505-000000 aa ?95iZ%2Fjuimv31g 1 13201',\n",
" u'20090505-000000 aa File:Wikinews-logo.svg 1 8357',\n",
" u'20090505-000000 aa Main_Page 2 9980']"
]
}
],
"prompt_number": 3
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**3.** Let\u2019s see how many records in total are in this data set (this command will take a while, so read ahead while it is running)."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time pagecounts.count()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 64 ms, sys: 16 ms, total: 80 ms\n",
"Wall time: 1min 46s\n"
]
},
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 4,
"text": [
"329641466"
]
}
],
"prompt_number": 4
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This should launch 177 Spark tasks on the Spark cluster. If you look closely at the terminal, the console log is pretty chatty and tells you the progress of the tasks. Because we are reading 20G of data from HDFS, this task is I/O bound and can take a while to scan through all the data (2 - 3 mins). \ufffc While it\u2019s running, you can open the Spark web console to see the progress:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from IPython.display import HTML, IFrame, display\n",
"my_url = CLUSTER_URL.split(':')[1]\n",
"spark_console = 'http:'+my_url+':8080'\n",
"\n",
"display(HTML('You can see the <a href=\"%s\" target=\"_blank\">Spark Console</a> here:' \n",
" % spark_console))\n",
"IFrame(spark_console, 800, 300)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"html": [
"You can see the <a href=\"http://ec2-54-224-169-175.compute-1.amazonaws.com:8080\" target=\"_blank\">Spark Console</a> here:"
],
"metadata": {},
"output_type": "display_data",
"text": [
"<IPython.core.display.HTML at 0x302a310>"
]
},
{
"html": [
"\n",
" <iframe\n",
" width=\"800\"\n",
" height=300\"\n",
" src=\"http://ec2-54-224-169-175.compute-1.amazonaws.com:8080\"\n",
" frameborder=\"0\"\n",
" allowfullscreen\n",
" ></iframe>\n",
" "
],
"metadata": {},
"output_type": "pyout",
"prompt_number": 5,
"text": [
"<IPython.lib.display.IFrame at 0x302a290>"
]
}
],
"prompt_number": 5
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**4.** Recall from above when we described the format of the data set, that the second field is the \u201cproject code\u201d and contains information about the language of the pages. For example, the project code \u201cen\u201d indicates an English page. Let\u2019s derive an RDD containing only English pages from pagecounts. This can be done by applying a filter function to pagecounts. For each record, we can split it by the field delimiter (i.e. a space) and get the second field-\u2013 and then compare it with the string \u201cen\u201d.\n",
"\n",
"To avoid reading from disks each time we perform any operations on the RDD, we also cache the RDD into memory. This is where Spark really starts to to shine."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"enPages = pagecounts.filter(lambda x: x.split(\" \")[1] == \"en\").cache()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 6
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When you type this command into the Spark shell, Spark defines the RDD, but because of lazy evaluation, no computation is done yet. Next time any action is invoked on enPages, Spark will cache the data set in memory across the 5 slaves in your cluster.\n",
"\n",
"**5.** How many records are there for English pages?"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time enPages.count()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 76 ms, sys: 36 ms, total: 112 ms\n",
"Wall time: 2min 48s\n"
]
},
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 7,
"text": [
"122352588"
]
}
],
"prompt_number": 7
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The first time this command is run, similar to the last count we did, it will take 2 - 3 minutes while Spark scans through the entire data set on disk. **But since enPages was marked as \u201ccached\u201d in the previous step, if you run count on the same RDD again, it should return an order of magnitude faster:**"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%time enPages.count()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 28 ms, sys: 32 ms, total: 60 ms\n",
"Wall time: 6.26 s\n"
]
},
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 8,
"text": [
"122352588"
]
}
],
"prompt_number": 8
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you examine the console log closely, you will see lines like this, indicating some data was added to the cache:\n",
"\n",
" 13/02/05 20:29:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_172 in memory on ip-10-188-18-127.ec2.internal:42068 (size: 271.8 MB, free: 5.5 GB)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**6.** Let\u2019s try something fancier. Generate a histogram of total page views on Wikipedia English pages for the date range represented in our dataset (May 5 to May 7, 2009). The high level idea of what we\u2019ll be doing is as follows. First, we generate a key value pair for each line; the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field)."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"enTuples = enPages.map(lambda x: x.split(\" \"))\n",
"enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3])))"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 9
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we shuffle the data and group all values of the same key together. Finally we sum up the values for each key. There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is algebraic and applies combiners on the mapper side. Since we know there is a very limited number of keys in this case (because there are only 3 unique dates in our data set), let\u2019s use only one reducer."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 10,
"text": [
"[(u'20090506', 204190442), (u'20090507', 202617618), (u'20090505', 207698578)]"
]
}
],
"prompt_number": 10
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The collect method at the end converts the result from an RDD to an array.\n",
"\n",
"We can combine the previous three commands into one:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"enPages.map(\n",
" lambda x: x.split(\" \")\n",
" ).map(\n",
" lambda x: (x[0][:8], int(x[3]))\n",
" ).reduceByKey(\n",
" lambda x, y: x + y, 1\n",
" ).collect()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 11,
"text": [
"[(u'20090506', 204190442), (u'20090507', 202617618), (u'20090505', 207698578)]"
]
}
],
"prompt_number": 11
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**7.** Suppose we want to find pages that were viewed more than 200,000 times during the three days covered by our dataset. Conceptually, this task is similar to the previous query. But, given the large number of pages (23 million distinct page names), the new task is very expensive. We are doing an expensive group-by with a lot of network shuffling of data.\n",
"\n",
"To recap, first we split each line of data into its respective fields. Next, we extract the fields for page name and number of page views. We reduce by key again, this time with 40 reducers. Then we filter out pages with less than 200,000 total views over our time window represented by our dataset."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%time\n",
"fields = enPages.map(lambda x: x.split(\" \"))\n",
"title_nhits = fields.map(lambda x: (x[2], int(x[3])))\n",
"nhits = title_nhits.reduceByKey(lambda x, y: x + y, 40)\n",
"high_traffic = nhits.filter(lambda x: x[1] > 200000).map(lambda x: (x[1], x[0])).collect()\n",
"high_traffic"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"CPU times: user 104 ms, sys: 20 ms, total: 124 ms\n",
"Wall time: 3min 27s\n"
]
}
],
"prompt_number": 12
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is no hard and fast way to calculate the optimal number of reducers for a given problem; you will build up intuition over time by experimenting with different values.\n",
"\n",
"You can explore the full RDD API by browsing the [Java/Scala](http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/index.html#spark.RDD) or [Python](http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/index.html) API docs."
]
},
{
"cell_type": "heading",
"level": 2,
"metadata": {},
"source": [
"Running Standalone Spark Programs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Because of time constraints, in this tutorial we focus on ad-hoc style analytics using the Spark shell. However, for many tasks, it makes more sense to write a standalone Spark program. We will return to this in the section on Spark Streaming below, where you will actually write a standalone Spark Streaming job. We aren\u2019t going to cover how to structure, build, and run standalone Spark jobs here, but before we move on, we list here a few resources about standalone Spark jobs for you to come back and explore later.\n",
"\n",
"First, on the AMI for this tutorial we have included \u201ctemplate\u201d projects for Scala and Java standalone programs for both Spark and Spark streaming. The Spark ones can be found in the `/root/scala-app-template` and `/root/java-app-template` directories (we will discuss the Streaming ones later). Feel free to browse through the contents of those directories. You can also find examples of building and running Spark standalone jobs in [Java](http://spark.incubator.apache.org/docs/0.6.0/quick-start.html#a-standalone-job-in-java) and in [Scala](http://www.spark-project.org/docs/0.6.0/quick-start.html#a-standalone-job-in-scala) as part of the Spark Quick Start Guide. For even more details, see Matei Zaharia\u2019s [slides](http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-part-2-amp-camp-2012-standalone-programs.pdf) and [talk video](http://www.youtube.com/watch?v=7k4yDKBYOcw&t=59m37s) about Standalone Spark jobs at the [first AMP Camp](http://ampcamp.berkeley.edu/agenda-2012)."
]
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment