Created
December 16, 2016 11:24
-
-
Save maasg/7b8a4991ba9e2c236ddd8dfd823352cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "metadata": { | |
| "name": "stockQuotes", | |
| "user_save_timestamp": "1970-01-01T01:00:00.000Z", | |
| "auto_save_timestamp": "1970-01-01T01:00:00.000Z", | |
| "language_info": { | |
| "name": "scala", | |
| "file_extension": "scala", | |
| "codemirror_mode": "text/x-scala" | |
| }, | |
| "trusted": true, | |
| "customLocalRepo": null, | |
| "customRepos": null, | |
| "customDeps": null, | |
| "customImports": null, | |
| "customArgs": null, | |
| "customSparkConf": null | |
| }, | |
| "cells": [ | |
| { | |
| "metadata": { | |
| "id": "FC49BEFBEF0740F0AA69845BD5E00F1D" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Notebook created with the [Spark Notebook](http://spark-notebook.io)" | |
| }, | |
| { | |
| "metadata": { | |
| "id": "503909B6C8F44F9795190C3F4F960EB2" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "#Extracting ticker name from stock quotes csv" | |
| }, | |
| { | |
| "metadata": { | |
| "id": "1E5CDEC106F34CC88DEC3F1B8BE3904B" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "We're going to work with a sample dataset in this format:\n```\nexchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close\n```\n\nLet's see some data first:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "presentation": { | |
| "tabs_state": "{\n \"tab_id\": \"#tab1016190705-0\"\n}", | |
| "pivot_chart_state": "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}" | |
| }, | |
| "id": "20E0A52D823240CE8D3290118424A538" | |
| }, | |
| "cell_type": "code", | |
| "source": "val dataset = sparkContext.textFile(\"/tmp/quotes_clean.csv\")\ndataset.take(10)\n ", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "dataset: org.apache.spark.rdd.RDD[String] = /tmp/quotes_clean.csv MapPartitionsRDD[1] at textFile at <console>:77\nres3: Array[String] = Array(NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78, NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899, NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74, NYSE,BA,2/5/2016,123.61,123.71,186882,123.69, NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75, NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535, NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82, NYSE,DD,2/5/2016,59.10,59.29,158011,59.05, NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275, NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015)\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "<div>\n <script data-this=\"{"dataId":"anon3c9cd36b15acc0aa0ec5fa1375026158","dataInit":[],"genId":"1016190705"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n function(playground, _magictabs) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictabs,\n \"o\": {}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <div>\n <ul class=\"nav nav-tabs\" id=\"ul1016190705\"><li>\n <a href=\"#tab1016190705-0\"><i class=\"fa fa-table\"/></a>\n </li><li>\n <a href=\"#tab1016190705-1\"><i class=\"fa fa-cubes\"/></a>\n </li></ul>\n\n <div class=\"tab-content\" id=\"tab1016190705\"><div class=\"tab-pane\" id=\"tab1016190705-0\">\n <div>\n <script data-this=\"{"dataId":"anon7ee7503329b372f3e683ebb183ca6947","dataInit":[{"string value":"NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78"},{"string value":"NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899"},{"string value":"NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74"},{"string value":"NYSE,BA,2/5/2016,123.61,123.71,186882,123.69"},{"string value":"NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75"},{"string value":"NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535"},{"string value":"NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82"},{"string value":"NYSE,DD,2/5/2016,59.10,59.29,158011,59.05"},{"string value":"NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275"},{"string value":"NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015"}],"genId":"1979128498"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tableChart'], \n function(playground, _magictableChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictableChart,\n \"o\": {\"headers\":[\"string value\"],\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon3f3969e0c7e1a8ddb40773757777606e","initialValue":"10"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon43571856ad0b4bcf6dc4e35a45948bba","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab1016190705-1\">\n <div>\n <script data-this=\"{"dataId":"anone48afae8f76566c756723ddf5da75972","dataInit":[{"string value":"NYSE,DIA,2/5/2016,164.14,163.87,310868,163.78"},{"string value":"NYSE,AAPL,2/5/2016,96.6000,96.5100,2046336,96.2899"},{"string value":"NYSE,AXP,2/5/2016,54.38,54.72,545726,54.74"},{"string value":"NYSE,BA,2/5/2016,123.61,123.71,186882,123.69"},{"string value":"NYSE,CAT,2/5/2016,65.96,65.67,386350,65.75"},{"string value":"NYSE,CSCO,2/5/2016,23.540,23.510,1689617,23.535"},{"string value":"NYSE,CVX,2/5/2016,84.79,83.94,558915,83.82"},{"string value":"NYSE,DD,2/5/2016,59.10,59.29,158011,59.05"},{"string value":"NYSE,DIS,2/5/2016,95.430,95.320,301590,95.275"},{"string value":"NYSE,GE,2/5/2016,29.180,29.160,1651780,29.015"}],"genId":"403611917"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/pivotChart'], \n function(playground, _magicpivotChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicpivotChart,\n \"o\": {\"width\":600,\"height\":400,\"derivedAttributes\":{},\"extraOptions\":{}}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon61da9f52d8c06c4dc59a6abbc8e8c86e","initialValue":"10"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon80afba80ef3705ae29707b030b3246fe","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div></div>\n </div>\n </div></div>" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 2, | |
| "time": "Took: 1 second 736 milliseconds, at 2016-12-16 12:4" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "80D6970759B843469E0BCBF301ECB569" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "### First, we read the csv file: " | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "0B174315E38649758CE56243FE7AAAB5" | |
| }, | |
| "cell_type": "code", | |
| "source": "val lines: RDD[String] = sc.textFile(\"/tmp/quotes_clean.csv\")\nval validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter)\nval fields: RDD[Array[String]] = validLines.map(line => line.split(\",\"))", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "lines: org.apache.spark.rdd.RDD[String] = /tmp/quotes_clean.csv MapPartitionsRDD[3] at textFile at <console>:77\nvalidLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:78\nfields: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:79\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 3, | |
| "time": "Took: 621 milliseconds, at 2016-12-16 12:6" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "B2348E23A54B47FD80FB89E904F54521" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "We are interested in the stock_symbol field, which positionally is the element #1 in a 0-based array:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "214B1281035A458B8859C2D6FD711959" | |
| }, | |
| "cell_type": "code", | |
| "source": "val stockSymbols:RDD[String] = fields.map(record => record(1))\n", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "stockSymbols: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:83\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 4, | |
| "time": "Took: 668 milliseconds, at 2016-12-16 12:7" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "09DFCE4D63E04D3F8B3C9908BD05DFA7" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "If we want to count the symbols, all that's left to do is to issue a count:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "F5587BAF628A44829F36D3C62DA393F7" | |
| }, | |
| "cell_type": "code", | |
| "source": "val totalSymbolCount = stockSymbols.count()", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "totalSymbolCount: Long = 68760\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 6, | |
| "time": "Took: 606 milliseconds, at 2016-12-16 12:8" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "CA2D5C1E2DB7431F912D026D55BE2145" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "That's not very helpful because we have one entry for every record. We can check that:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "70012C1CED894954855614486032A7AA" | |
| }, | |
| "cell_type": "code", | |
| "source": "validLines.count", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "res9: Long = 68760\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "68760" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 7, | |
| "time": "Took: 835 milliseconds, at 2016-12-16 12:9" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "8790B6D215624A2B8C38E40E431DE1D7" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "### Slightly more interesting questions would be:\n\n- How many different stock symbols we have?" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "C635F859D77B4758861AF7C150EFC27C" | |
| }, | |
| "cell_type": "code", | |
| "source": "val uniqueStockSymbols = stockSymbols.distinct.count()", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "uniqueStockSymbols: Long = 30\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 9, | |
| "time": "Took: 749 milliseconds, at 2016-12-16 12:10" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "D2CA406E9EA545648D07123B2CBF1B8B" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "- How many records for each symbol do we have?" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "FD80A447D19744F0828223B0E2426616" | |
| }, | |
| "cell_type": "code", | |
| "source": "val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_)", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "countBySymbol: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:85\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 11, | |
| "time": "Took: 598 milliseconds, at 2016-12-16 12:10" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "653F2DA698E2483F8984AFAA563A0749" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "# Using Dataframes\n### Dataframes provide a high level abstraction to easily manipulate datasets. \nIn Spark 2.0, CSV support for Dataframes and Datasets is available out of the box\nGiven that our data does not have a header row with the field names (what's usual in large datasets), we will need to provide the column names:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "AA939D1B82CF439090F6091C5438C2AD" | |
| }, | |
| "cell_type": "code", | |
| "source": "val stockDF = sparkSession.read.csv(\"/tmp/quotes_clean.csv\").toDF(\"exchange\", \"symbol\", \"date\", \"open\", \"close\", \"volume\", \"price\") \n", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "stockDF: org.apache.spark.sql.DataFrame = [exchange: string, symbol: string ... 5 more fields]\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 14, | |
| "time": "Took: 582 milliseconds, at 2016-12-16 12:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "id": "4857AFF08E894A64995D709EC5BB9836" | |
| }, | |
| "cell_type": "markdown", | |
| "source": "We can answer our questions very easy now:" | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "B0F9C2623C7E421E9D94DD8AA6A21C08" | |
| }, | |
| "cell_type": "code", | |
| "source": "val uniqueSymbols = stockDF.select(\"symbol\").distinct().count\n", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "uniqueSymbols: Long = 30\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 19, | |
| "time": "Took: 965 milliseconds, at 2016-12-16 12:18" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "E10CE60276B14411A09451FD4D149D9D" | |
| }, | |
| "cell_type": "code", | |
| "source": "val recordsPerSymbol = stockDF.groupBy($\"symbol\").agg(count($\"symbol\"))", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "recordsPerSymbol: org.apache.spark.sql.DataFrame = [symbol: string, count(symbol): bigint]\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 25, | |
| "time": "Took: 427 milliseconds, at 2016-12-16 12:22" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "2CF51A0AAA2747B68E9B0B126405A2FD" | |
| }, | |
| "cell_type": "code", | |
| "source": "recordsPerSymbol.show()", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "+------+-------------+\n|symbol|count(symbol)|\n+------+-------------+\n| MMM| 2292|\n| AXP| 2292|\n| AAPL| 2292|\n| CSCO| 2292|\n| XOM| 2292|\n| JPM| 2292|\n| DIS| 2292|\n| INTC| 2292|\n| MRK| 2292|\n| PG| 2292|\n| MCD| 2292|\n| VZ| 2292|\n| DD| 2292|\n| TRV| 2292|\n| PFE| 2292|\n| UNH| 2292|\n| IBM| 2292|\n| JNJ| 2292|\n| CVX| 2292|\n| WMT| 2292|\n+------+-------------+\nonly showing top 20 rows\n\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 26, | |
| "time": "Took: 1 second 233 milliseconds, at 2016-12-16 12:22" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": true, | |
| "id": "C23B545DBE3349658B6FD756B0524F12" | |
| }, | |
| "cell_type": "code", | |
| "source": "", | |
| "outputs": [] | |
| } | |
| ], | |
| "nbformat": 4 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment