-
-
Save connectthefuture/88a9b2ee03be30459fc55549519f548c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Assumptions\n", | |
"- Running recent version of Mac OSX\n", | |
"- Comfortable with CLI" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Set up\n", | |
"\n", | |
"## Install Mac OSX CLI package manager\n", | |
"- Install homebrew (http://brew.sh/)\n", | |
"\n", | |
"## Install Python package manager\n", | |
"\n", | |
"```$ brew install pip```\n", | |
"\n", | |
"## Install and start a one-node Riak TS cluster\n", | |
"```$ brew install riak-ts``` (custom homebrew recipe--not publically available yet)\n", | |
"\n", | |
"```$ riak start```\n", | |
"\n", | |
"## Install Spark\n", | |
"```$ brew install apache-spark```\n", | |
"\n", | |
"## Download the Spark Connector Jar\n", | |
"```$ mkdir -p /tmp/basho```\n", | |
"```$ wget -nc -O /tmp/basho/spark-riak-connector-1.3.0-beta1-uber.jar https://bintray.com/artifact/download/basho/data-platform/com/basho/riak/spark-riak-connector/1.3.0-beta1/spark-riak-connector-1.3.0-beta1-uber.jar```\n", | |
"\n", | |
"## Install Jupyter Notebook\n", | |
"```$ pip install jupyter```\n", | |
"\n", | |
"## Start Jupyter\n", | |
"```$ SPARK_CLASSPATH=/tmp/basho/spark-riak-connector-1.3.0-beta1-uber.jar jupyter notebook```\n", | |
"\n", | |
"## Load this notebook into Jupyter\n", | |
"- File > Open... > pyspark-riak-ts.ipynb" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Import Python dependencies" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 309, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import riak, datetime, time, random" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Set up the Riak TS cluster location" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 310, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"host='127.0.0.1'\n", | |
"pb_port = '8087'\n", | |
"hostAndPort = \":\".join([host, pb_port])\n", | |
"\n", | |
"client = riak.RiakClient(host=host, pb_port=pb_port)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Set up a Riak TS table object" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 311, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"table_name = 'pyspark-%d' % int(time.time())\n", | |
"table = client.table(table_name)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Create the table schema and activate the table" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 312, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CREATE TABLE pyspark-1458254031 (\n", | |
"site varchar not null,\n", | |
"species varchar not null,\n", | |
"measurementDate timestamp not null,\n", | |
"value double, \n", | |
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),\n", | |
" site, species, measurementDate))\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"create_sql = \"\"\"CREATE TABLE %(table_name)s (\n", | |
"site varchar not null,\n", | |
"species varchar not null,\n", | |
"measurementDate timestamp not null,\n", | |
"value double, \n", | |
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),\n", | |
" site, species, measurementDate))\n", | |
"\"\"\" % ({'table_name': table_name})\n", | |
"\n", | |
"print create_sql\n", | |
"\n", | |
"result = table.query(create_sql)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Show the table schema" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 313, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"['site', 'varchar', False, 1L, 1L]\n", | |
"['species', 'varchar', False, 2L, 2L]\n", | |
"['measurementDate', 'timestamp', False, 3L, 3L]\n", | |
"['value', 'double', True, None, None]\n" | |
] | |
} | |
], | |
"source": [ | |
"schema = table.describe().rows\n", | |
"\n", | |
"for r in schema:\n", | |
" print r" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Load some generated test data into the table" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 314, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"['AA', 'fff', 1458254032, 51.74653193964629]\n", | |
"['AA', 'fff', 1458254033, 33.46475093562373]\n", | |
"['AA', 'fff', 1458254034, 100.12799337602472]\n", | |
"['AA', 'fff', 1458254035, 33.95855390122852]\n", | |
"['AA', 'fff', 1458254036, 5.046943060843933]\n", | |
"['AA', 'fff', 1458254037, 25.252698805775097]\n", | |
"['AA', 'fff', 1458254038, 1.9910417067368549]\n", | |
"['AA', 'fff', 1458254039, -3.3164738228128314]\n", | |
"['AA', 'fff', 1458254040, 104.54752899969354]\n" | |
] | |
} | |
], | |
"source": [ | |
"site = 'AA'\n", | |
"species = 'fff'\n", | |
"\n", | |
"start_date = int(time.time())\n", | |
"\n", | |
"events = []\n", | |
"for i in range(9):\n", | |
" measurementDate = start_date + i\n", | |
" value = random.uniform(-20, 110)\n", | |
" events.append([site, species, measurementDate, value])\n", | |
"\n", | |
"end_date = measurementDate \n", | |
" \n", | |
"result = table.new(events).store()\n", | |
"\n", | |
"for e in events:\n", | |
" print e\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Read the test data and confirm that the data has been written to Riak TS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 315, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"SELECT *\n", | |
"FROM pyspark-1458254031\n", | |
"WHERE measurementDate > 1458254032\n", | |
"AND measurementDate < 1458254040\n", | |
"AND site = 'AA'\n", | |
"AND species = 'fff'\n", | |
"\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 33000), 33.46475093562373]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 34000), 100.12799337602472]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 35000), 33.95855390122852]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 36000), 5.046943060843933]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 37000), 25.252698805775097]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 38000), 1.9910417067368549]\n", | |
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 39000), -3.3164738228128314]\n" | |
] | |
} | |
], | |
"source": [ | |
"select_sql = \"\"\"SELECT *\n", | |
"FROM %(table_name)s\n", | |
"WHERE measurementDate > %(start_date)s\n", | |
"AND measurementDate < %(end_date)s\n", | |
"AND site = '%(site)s'\n", | |
"AND species = '%(species)s'\n", | |
"\"\"\" % ({'table_name': table_name, 'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})\n", | |
"\n", | |
"print select_sql\n", | |
"\n", | |
"result = table.query(select_sql)\n", | |
"\n", | |
"for r in result.rows:\n", | |
" print r\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Set up PySpark dependencies" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 316, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"spark.app.name=pyspark-shell\n", | |
"spark.master=local[*]\n", | |
"spark.submit.deployMode=client\n" | |
] | |
} | |
], | |
"source": [ | |
"import findspark\n", | |
"findspark.init()\n", | |
"\n", | |
"import pyspark\n", | |
"\n", | |
"conf = pyspark.SparkConf()\n", | |
"print conf.toDebugString()\n", | |
"\n", | |
"sc = pyspark.SparkContext(conf=conf)\n", | |
"sqlContext = pyspark.SQLContext(sc)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Run a quick test to confirm Spark is working (independent of Riak TS)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 317, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[0, 2, 3, 4, 6]" | |
] | |
}, | |
"execution_count": 317, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"sc.parallelize([0, 2, 3, 4, 6], 5).collect()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Use Riak TS as SparkSQL datasource and read data from the Riak TS table in a Spark Dataframe" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 318, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"df = sqlContext.read\\\n", | |
" .format(\"org.apache.spark.sql.riak\")\\\n", | |
" .option(\"spark.riak.connection.host\", hostAndPort)\\\n", | |
" .option(\"spark.riakts.bindings.timestamp\", \"useLong\")\\\n", | |
" .load(table_name)\\\n", | |
" .filter(\"\"\"measurementDate > %(start_date)s\n", | |
" AND measurementDate < %(end_date)s\n", | |
" AND site = '%(site)s'\n", | |
" AND species = '%(species)s'\n", | |
" \"\"\" % ({'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})) " | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Show the schema of the Spark Dataframe and show the contents of the Spark Dataframe" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 319, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"root\n", | |
" |-- site: string (nullable = false)\n", | |
" |-- species: string (nullable = false)\n", | |
" |-- measurementDate: long (nullable = false)\n", | |
" |-- value: double (nullable = true)\n", | |
"\n", | |
"+----+-------+---------------+-------------------+\n", | |
"|site|species|measurementDate| value|\n", | |
"+----+-------+---------------+-------------------+\n", | |
"| AA| fff| 1458254033| 33.46475093562373|\n", | |
"| AA| fff| 1458254034| 100.12799337602472|\n", | |
"| AA| fff| 1458254035| 33.95855390122852|\n", | |
"| AA| fff| 1458254036| 5.046943060843933|\n", | |
"| AA| fff| 1458254037| 25.252698805775097|\n", | |
"| AA| fff| 1458254038| 1.9910417067368549|\n", | |
"| AA| fff| 1458254039|-3.3164738228128314|\n", | |
"+----+-------+---------------+-------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df.printSchema()\n", | |
"\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Register a temp table in order to run SparkSQL on the dataframe and run a quick SparkSQL aggregation function" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 320, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+------------------+\n", | |
"| average_value|\n", | |
"+------------------+\n", | |
"|28.075072566202863|\n", | |
"+------------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df.registerTempTable(\"pyspark_tmp\")\n", | |
"\n", | |
"sqlContext.sql(\"select avg(value) as average_value from pyspark_tmp\").show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Stop the SparkContext" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 321, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"sc.stop()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Delete the test data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 322, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 32000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 33000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 34000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 35000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 36000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 37000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 38000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 39000)]\n", | |
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 40000)]\n" | |
] | |
} | |
], | |
"source": [ | |
"stream = table.stream_keys()\n", | |
"\n", | |
"for keys in stream:\n", | |
" for k in sorted(keys):\n", | |
" print 'Deleted - %s' % k\n", | |
" table.delete(k)\n", | |
"\n", | |
"stream.close()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 2", | |
"language": "python", | |
"name": "python2" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.10" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment