Skip to content

Instantly share code, notes, and snippets.

@maasg
Created December 16, 2016 11:24
Show Gist options
  • Select an option

  • Save maasg/7b8a4991ba9e2c236ddd8dfd823352cc to your computer and use it in GitHub Desktop.

Select an option

Save maasg/7b8a4991ba9e2c236ddd8dfd823352cc to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "stockQuotes",
"user_save_timestamp": "1970-01-01T01:00:00.000Z",
"auto_save_timestamp": "1970-01-01T01:00:00.000Z",
"language_info": {
"name": "scala",
"file_extension": "scala",
"codemirror_mode": "text/x-scala"
},
"trusted": true,
"customLocalRepo": null,
"customRepos": null,
"customDeps": null,
"customImports": null,
"customArgs": null,
"customSparkConf": null
},
"cells": [
{
"metadata": {
"id": "FC49BEFBEF0740F0AA69845BD5E00F1D"
},
"cell_type": "markdown",
"source": "Notebook created with the [Spark Notebook](http://spark-notebook.io)"
},
{
"metadata": {
"id": "503909B6C8F44F9795190C3F4F960EB2"
},
"cell_type": "markdown",
"source": "#Extracting ticker name from stock quotes csv"
},
{
"metadata": {
"id": "1E5CDEC106F34CC88DEC3F1B8BE3904B"
},
"cell_type": "markdown",
"source": "We're going to work with a sample dataset in this format:\n```\nexchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close\n```\n\nLet's see some data first:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"presentation": {
"tabs_state": "{\n \"tab_id\": \"#tab1016190705-0\"\n}",
"pivot_chart_state": "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}"
},
"id": "20E0A52D823240CE8D3290118424A538"
},
"cell_type": "code",
"source": "val dataset = sparkContext.textFile(\"/tmp/quotes_clean.csv\")\ndataset.take(10)\n ",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "dataset: org.apache.spark.rdd.RDD[String] = /tmp/quotes_clean.csv MapPartitionsRDD[1] at textFile at <console>:77\nres3: Array[String] = Array(NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78, NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899, NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74, NYSE,BA,2/5/2016,123.61,123.71,186882,123.69, NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75, NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535, NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82, NYSE,DD,2/5/2016,59.10,59.29,158011,59.05, NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275, NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015)\n"
},
{
"metadata": {},
"data": {
"text/html": "<div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon3c9cd36b15acc0aa0ec5fa1375026158&quot;,&quot;dataInit&quot;:[],&quot;genId&quot;:&quot;1016190705&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n function(playground, _magictabs) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictabs,\n \"o\": {}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <div>\n <ul class=\"nav nav-tabs\" id=\"ul1016190705\"><li>\n <a href=\"#tab1016190705-0\"><i class=\"fa fa-table\"/></a>\n </li><li>\n <a href=\"#tab1016190705-1\"><i class=\"fa fa-cubes\"/></a>\n </li></ul>\n\n <div class=\"tab-content\" id=\"tab1016190705\"><div class=\"tab-pane\" id=\"tab1016190705-0\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon7ee7503329b372f3e683ebb183ca6947&quot;,&quot;dataInit&quot;:[{&quot;string value&quot;:&quot;NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78&quot;},{&quot;string value&quot;:&quot;NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899&quot;},{&quot;string value&quot;:&quot;NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74&quot;},{&quot;string value&quot;:&quot;NYSE,BA,2/5/2016,123.61,123.71,186882,123.69&quot;},{&quot;string value&quot;:&quot;NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75&quot;},{&quot;string value&quot;:&quot;NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535&quot;},{&quot;string value&quot;:&quot;NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82&quot;},{&quot;string value&quot;:&quot;NYSE,DD,2/5/2016,59.10,59.29,158011,59.05&quot;},{&quot;string value&quot;:&quot;NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275&quot;},{&quot;string value&quot;:&quot;NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015&quot;}],&quot;genId&quot;:&quot;1979128498&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tableChart'], \n function(playground, _magictableChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictableChart,\n \"o\": {\"headers\":[\"string value\"],\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon3f3969e0c7e1a8ddb40773757777606e&quot;,&quot;initialValue&quot;:&quot;10&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon43571856ad0b4bcf6dc4e35a45948bba&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab1016190705-1\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anone48afae8f76566c756723ddf5da75972&quot;,&quot;dataInit&quot;:[{&quot;string value&quot;:&quot;NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78&quot;},{&quot;string value&quot;:&quot;NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899&quot;},{&quot;string value&quot;:&quot;NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74&quot;},{&quot;string value&quot;:&quot;NYSE,BA,2/5/2016,123.61,123.71,186882,123.69&quot;},{&quot;string value&quot;:&quot;NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75&quot;},{&quot;string value&quot;:&quot;NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535&quot;},{&quot;string value&quot;:&quot;NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82&quot;},{&quot;string value&quot;:&quot;NYSE,DD,2/5/2016,59.10,59.29,158011,59.05&quot;},{&quot;string value&quot;:&quot;NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275&quot;},{&quot;string value&quot;:&quot;NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015&quot;}],&quot;genId&quot;:&quot;403611917&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/pivotChart'], \n function(playground, _magicpivotChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicpivotChart,\n \"o\": {\"width\":600,\"height\":400,\"derivedAttributes\":{},\"extraOptions\":{}}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon61da9f52d8c06c4dc59a6abbc8e8c86e&quot;,&quot;initialValue&quot;:&quot;10&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon80afba80ef3705ae29707b030b3246fe&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div></div>\n </div>\n </div></div>"
},
"output_type": "execute_result",
"execution_count": 2,
"time": "Took: 1 second 736 milliseconds, at 2016-12-16 12:4"
}
]
},
{
"metadata": {
"id": "80D6970759B843469E0BCBF301ECB569"
},
"cell_type": "markdown",
"source": "### First, we read the csv file: "
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "0B174315E38649758CE56243FE7AAAB5"
},
"cell_type": "code",
"source": "val lines: RDD[String] = sc.textFile(\"/tmp/quotes_clean.csv\")\nval validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter)\nval fields: RDD[Array[String]] = validLines.map(line => line.split(\",\"))",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "lines: org.apache.spark.rdd.RDD[String] = /tmp/quotes_clean.csv MapPartitionsRDD[3] at textFile at <console>:77\nvalidLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:78\nfields: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:79\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 3,
"time": "Took: 621 milliseconds, at 2016-12-16 12:6"
}
]
},
{
"metadata": {
"id": "B2348E23A54B47FD80FB89E904F54521"
},
"cell_type": "markdown",
"source": "We are interested in the stock_symbol field, which positionally is the element #1 in a 0-based array:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "214B1281035A458B8859C2D6FD711959"
},
"cell_type": "code",
"source": "val stockSymbols:RDD[String] = fields.map(record => record(1))\n",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "stockSymbols: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:83\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 4,
"time": "Took: 668 milliseconds, at 2016-12-16 12:7"
}
]
},
{
"metadata": {
"id": "09DFCE4D63E04D3F8B3C9908BD05DFA7"
},
"cell_type": "markdown",
"source": "If we want to count the symbols, all that's left to do is to issue a count:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "F5587BAF628A44829F36D3C62DA393F7"
},
"cell_type": "code",
"source": "val totalSymbolCount = stockSymbols.count()",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "totalSymbolCount: Long = 68760\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 6,
"time": "Took: 606 milliseconds, at 2016-12-16 12:8"
}
]
},
{
"metadata": {
"id": "CA2D5C1E2DB7431F912D026D55BE2145"
},
"cell_type": "markdown",
"source": "That's not very helpful because we have one entry for every record. We can check that:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "70012C1CED894954855614486032A7AA"
},
"cell_type": "code",
"source": "validLines.count",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "res9: Long = 68760\n"
},
{
"metadata": {},
"data": {
"text/html": "68760"
},
"output_type": "execute_result",
"execution_count": 7,
"time": "Took: 835 milliseconds, at 2016-12-16 12:9"
}
]
},
{
"metadata": {
"id": "8790B6D215624A2B8C38E40E431DE1D7"
},
"cell_type": "markdown",
"source": "### Slightly more interesting questions would be:\n\n- How many different stock symbols we have?"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "C635F859D77B4758861AF7C150EFC27C"
},
"cell_type": "code",
"source": "val uniqueStockSymbols = stockSymbols.distinct.count()",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "uniqueStockSymbols: Long = 30\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 9,
"time": "Took: 749 milliseconds, at 2016-12-16 12:10"
}
]
},
{
"metadata": {
"id": "D2CA406E9EA545648D07123B2CBF1B8B"
},
"cell_type": "markdown",
"source": "- How many records for each symbol do we have?"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "FD80A447D19744F0828223B0E2426616"
},
"cell_type": "code",
"source": "val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "countBySymbol: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:85\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 11,
"time": "Took: 598 milliseconds, at 2016-12-16 12:10"
}
]
},
{
"metadata": {
"id": "653F2DA698E2483F8984AFAA563A0749"
},
"cell_type": "markdown",
"source": "# Using Dataframes\n### Dataframes provide a high level abstraction to easily manipulate datasets. \nIn Spark 2.0, CSV support for Dataframes and Datasets is available out of the box\nGiven that our data does not have a header row with the field names (what's usual in large datasets), we will need to provide the column names:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "AA939D1B82CF439090F6091C5438C2AD"
},
"cell_type": "code",
"source": "val stockDF = sparkSession.read.csv(\"/tmp/quotes_clean.csv\").toDF(\"exchange\", \"symbol\", \"date\", \"open\", \"close\", \"volume\", \"price\") \n",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "stockDF: org.apache.spark.sql.DataFrame = [exchange: string, symbol: string ... 5 more fields]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 14,
"time": "Took: 582 milliseconds, at 2016-12-16 12:16"
}
]
},
{
"metadata": {
"id": "4857AFF08E894A64995D709EC5BB9836"
},
"cell_type": "markdown",
"source": "We can answer our questions very easy now:"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "B0F9C2623C7E421E9D94DD8AA6A21C08"
},
"cell_type": "code",
"source": "val uniqueSymbols = stockDF.select(\"symbol\").distinct().count\n",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "uniqueSymbols: Long = 30\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 19,
"time": "Took: 965 milliseconds, at 2016-12-16 12:18"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "E10CE60276B14411A09451FD4D149D9D"
},
"cell_type": "code",
"source": "val recordsPerSymbol = stockDF.groupBy($\"symbol\").agg(count($\"symbol\"))",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "recordsPerSymbol: org.apache.spark.sql.DataFrame = [symbol: string, count(symbol): bigint]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 25,
"time": "Took: 427 milliseconds, at 2016-12-16 12:22"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "2CF51A0AAA2747B68E9B0B126405A2FD"
},
"cell_type": "code",
"source": "recordsPerSymbol.show()",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "+------+-------------+\n|symbol|count(symbol)|\n+------+-------------+\n| MMM| 2292|\n| AXP| 2292|\n| AAPL| 2292|\n| CSCO| 2292|\n| XOM| 2292|\n| JPM| 2292|\n| DIS| 2292|\n| INTC| 2292|\n| MRK| 2292|\n| PG| 2292|\n| MCD| 2292|\n| VZ| 2292|\n| DD| 2292|\n| TRV| 2292|\n| PFE| 2292|\n| UNH| 2292|\n| IBM| 2292|\n| JNJ| 2292|\n| CVX| 2292|\n| WMT| 2292|\n+------+-------------+\nonly showing top 20 rows\n\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 26,
"time": "Took: 1 second 233 milliseconds, at 2016-12-16 12:22"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": true,
"id": "C23B545DBE3349658B6FD756B0524F12"
},
"cell_type": "code",
"source": "",
"outputs": []
}
],
"nbformat": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment