Skip to content

Instantly share code, notes, and snippets.

@connectthefuture
Forked from mdigan/pyspark-riak-ts.ipynb
Created October 20, 2017 17:50
Show Gist options
  • Save connectthefuture/88a9b2ee03be30459fc55549519f548c to your computer and use it in GitHub Desktop.
Save connectthefuture/88a9b2ee03be30459fc55549519f548c to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Assumptions\n",
"- Running recent version of Mac OSX\n",
"- Comfortable with CLI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set up\n",
"\n",
"## Install Mac OSX CLI package manager\n",
"- Install homebrew (http://brew.sh/)\n",
"\n",
"## Install Python package manager\n",
"\n",
"```$ brew install pip```\n",
"\n",
"## Install and start a one-node Riak TS cluster\n",
"```$ brew install riak-ts``` (custom homebrew recipe--not publically available yet)\n",
"\n",
"```$ riak start```\n",
"\n",
"## Install Spark\n",
"```$ brew install apache-spark```\n",
"\n",
"## Download the Spark Connector Jar\n",
"```$ mkdir -p /tmp/basho```\n",
"```$ wget -nc -O /tmp/basho/spark-riak-connector-1.3.0-beta1-uber.jar https://bintray.com/artifact/download/basho/data-platform/com/basho/riak/spark-riak-connector/1.3.0-beta1/spark-riak-connector-1.3.0-beta1-uber.jar```\n",
"\n",
"## Install Jupyter Notebook\n",
"```$ pip install jupyter```\n",
"\n",
"## Start Jupyter\n",
"```$ SPARK_CLASSPATH=/tmp/basho/spark-riak-connector-1.3.0-beta1-uber.jar jupyter notebook```\n",
"\n",
"## Load this notebook into Jupyter\n",
"- File > Open... > pyspark-riak-ts.ipynb"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Import Python dependencies"
]
},
{
"cell_type": "code",
"execution_count": 309,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import riak, datetime, time, random"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set up the Riak TS cluster location"
]
},
{
"cell_type": "code",
"execution_count": 310,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"host='127.0.0.1'\n",
"pb_port = '8087'\n",
"hostAndPort = \":\".join([host, pb_port])\n",
"\n",
"client = riak.RiakClient(host=host, pb_port=pb_port)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set up a Riak TS table object"
]
},
{
"cell_type": "code",
"execution_count": 311,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"table_name = 'pyspark-%d' % int(time.time())\n",
"table = client.table(table_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create the table schema and activate the table"
]
},
{
"cell_type": "code",
"execution_count": 312,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CREATE TABLE pyspark-1458254031 (\n",
"site varchar not null,\n",
"species varchar not null,\n",
"measurementDate timestamp not null,\n",
"value double, \n",
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),\n",
" site, species, measurementDate))\n",
"\n"
]
}
],
"source": [
"create_sql = \"\"\"CREATE TABLE %(table_name)s (\n",
"site varchar not null,\n",
"species varchar not null,\n",
"measurementDate timestamp not null,\n",
"value double, \n",
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),\n",
" site, species, measurementDate))\n",
"\"\"\" % ({'table_name': table_name})\n",
"\n",
"print create_sql\n",
"\n",
"result = table.query(create_sql)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Show the table schema"
]
},
{
"cell_type": "code",
"execution_count": 313,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['site', 'varchar', False, 1L, 1L]\n",
"['species', 'varchar', False, 2L, 2L]\n",
"['measurementDate', 'timestamp', False, 3L, 3L]\n",
"['value', 'double', True, None, None]\n"
]
}
],
"source": [
"schema = table.describe().rows\n",
"\n",
"for r in schema:\n",
" print r"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load some generated test data into the table"
]
},
{
"cell_type": "code",
"execution_count": 314,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['AA', 'fff', 1458254032, 51.74653193964629]\n",
"['AA', 'fff', 1458254033, 33.46475093562373]\n",
"['AA', 'fff', 1458254034, 100.12799337602472]\n",
"['AA', 'fff', 1458254035, 33.95855390122852]\n",
"['AA', 'fff', 1458254036, 5.046943060843933]\n",
"['AA', 'fff', 1458254037, 25.252698805775097]\n",
"['AA', 'fff', 1458254038, 1.9910417067368549]\n",
"['AA', 'fff', 1458254039, -3.3164738228128314]\n",
"['AA', 'fff', 1458254040, 104.54752899969354]\n"
]
}
],
"source": [
"site = 'AA'\n",
"species = 'fff'\n",
"\n",
"start_date = int(time.time())\n",
"\n",
"events = []\n",
"for i in range(9):\n",
" measurementDate = start_date + i\n",
" value = random.uniform(-20, 110)\n",
" events.append([site, species, measurementDate, value])\n",
"\n",
"end_date = measurementDate \n",
" \n",
"result = table.new(events).store()\n",
"\n",
"for e in events:\n",
" print e\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Read the test data and confirm that the data has been written to Riak TS"
]
},
{
"cell_type": "code",
"execution_count": 315,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SELECT *\n",
"FROM pyspark-1458254031\n",
"WHERE measurementDate > 1458254032\n",
"AND measurementDate < 1458254040\n",
"AND site = 'AA'\n",
"AND species = 'fff'\n",
"\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 33000), 33.46475093562373]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 34000), 100.12799337602472]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 35000), 33.95855390122852]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 36000), 5.046943060843933]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 37000), 25.252698805775097]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 38000), 1.9910417067368549]\n",
"['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 39000), -3.3164738228128314]\n"
]
}
],
"source": [
"select_sql = \"\"\"SELECT *\n",
"FROM %(table_name)s\n",
"WHERE measurementDate > %(start_date)s\n",
"AND measurementDate < %(end_date)s\n",
"AND site = '%(site)s'\n",
"AND species = '%(species)s'\n",
"\"\"\" % ({'table_name': table_name, 'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})\n",
"\n",
"print select_sql\n",
"\n",
"result = table.query(select_sql)\n",
"\n",
"for r in result.rows:\n",
" print r\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set up PySpark dependencies"
]
},
{
"cell_type": "code",
"execution_count": 316,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"spark.app.name=pyspark-shell\n",
"spark.master=local[*]\n",
"spark.submit.deployMode=client\n"
]
}
],
"source": [
"import findspark\n",
"findspark.init()\n",
"\n",
"import pyspark\n",
"\n",
"conf = pyspark.SparkConf()\n",
"print conf.toDebugString()\n",
"\n",
"sc = pyspark.SparkContext(conf=conf)\n",
"sqlContext = pyspark.SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Run a quick test to confirm Spark is working (independent of Riak TS)"
]
},
{
"cell_type": "code",
"execution_count": 317,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[0, 2, 3, 4, 6]"
]
},
"execution_count": 317,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc.parallelize([0, 2, 3, 4, 6], 5).collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Use Riak TS as SparkSQL datasource and read data from the Riak TS table in a Spark Dataframe"
]
},
{
"cell_type": "code",
"execution_count": 318,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = sqlContext.read\\\n",
" .format(\"org.apache.spark.sql.riak\")\\\n",
" .option(\"spark.riak.connection.host\", hostAndPort)\\\n",
" .option(\"spark.riakts.bindings.timestamp\", \"useLong\")\\\n",
" .load(table_name)\\\n",
" .filter(\"\"\"measurementDate > %(start_date)s\n",
" AND measurementDate < %(end_date)s\n",
" AND site = '%(site)s'\n",
" AND species = '%(species)s'\n",
" \"\"\" % ({'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Show the schema of the Spark Dataframe and show the contents of the Spark Dataframe"
]
},
{
"cell_type": "code",
"execution_count": 319,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- site: string (nullable = false)\n",
" |-- species: string (nullable = false)\n",
" |-- measurementDate: long (nullable = false)\n",
" |-- value: double (nullable = true)\n",
"\n",
"+----+-------+---------------+-------------------+\n",
"|site|species|measurementDate| value|\n",
"+----+-------+---------------+-------------------+\n",
"| AA| fff| 1458254033| 33.46475093562373|\n",
"| AA| fff| 1458254034| 100.12799337602472|\n",
"| AA| fff| 1458254035| 33.95855390122852|\n",
"| AA| fff| 1458254036| 5.046943060843933|\n",
"| AA| fff| 1458254037| 25.252698805775097|\n",
"| AA| fff| 1458254038| 1.9910417067368549|\n",
"| AA| fff| 1458254039|-3.3164738228128314|\n",
"+----+-------+---------------+-------------------+\n",
"\n"
]
}
],
"source": [
"df.printSchema()\n",
"\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Register a temp table in order to run SparkSQL on the dataframe and run a quick SparkSQL aggregation function"
]
},
{
"cell_type": "code",
"execution_count": 320,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------------+\n",
"| average_value|\n",
"+------------------+\n",
"|28.075072566202863|\n",
"+------------------+\n",
"\n"
]
}
],
"source": [
"df.registerTempTable(\"pyspark_tmp\")\n",
"\n",
"sqlContext.sql(\"select avg(value) as average_value from pyspark_tmp\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stop the SparkContext"
]
},
{
"cell_type": "code",
"execution_count": 321,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"sc.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Delete the test data"
]
},
{
"cell_type": "code",
"execution_count": 322,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 32000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 33000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 34000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 35000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 36000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 37000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 38000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 39000)]\n",
"Deleted - ['AA', 'fff', datetime.datetime(1970, 1, 17, 21, 4, 14, 40000)]\n"
]
}
],
"source": [
"stream = table.stream_keys()\n",
"\n",
"for keys in stream:\n",
" for k in sorted(keys):\n",
" print 'Deleted - %s' % k\n",
" table.delete(k)\n",
"\n",
"stream.close()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment