Created
August 1, 2016 18:26
-
-
Save natbusa/96f2ad39656f639e23cfb464ff8cc309 to your computer and use it in GitHub Desktop.
Oriole - Anomaly detection and pattern extraction with Spark, Cassandra and Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 126, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Starting download from https://github.com/natalinobusa/nak/raw/master/nak_2.10-1.3.jar\n", | |
"Finished download of nak_2.10-1.3.jar\n" | |
] | |
} | |
], | |
"source": [ | |
"%addjar https://github.com/natalinobusa/nak/raw/master/nak_2.10-1.3.jar" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Setup\n", | |
"\n", | |
"This notebook is running scala code and iterfaces to a Spark cluster using the [Apache Toree](https://toree.incubator.apache.org/) project. Furthermore this notebook shows how to extract pattern from data stored in Cassandra and how to write back model and results in Cassandra tables. Spark interfaces to Cassandra via the [Cassandra-Spark connector](https://github.com/datastax/spark-cassandra-connector).\n", | |
"\n", | |
"At the time of compiling this notebook, Spark 1.6.1 and Cassandra 3.5 were used.\n", | |
"Here below the command to install the Spark - Scala Kernel on Jupiter. More instructions on this topic are available on Apache Toree [website](https://toree.incubator.apache.org/) and [github pages](https://github.com/apache/incubator-toree)\n", | |
"\n", | |
"```shell\n", | |
"sudo /opt/local/Library/Frameworks/Python.framework/Versions/3.5/bin/jupyter-toree install --spark_home=${SPARK_HOME} --spark_opts='--packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.0,graphframes:graphframes:0.1.0-spark1.6 --conf spark.cassandra.connection.host=localhost --conf spark.executor.memory=6g --conf spark.driver.memory=6g'\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"1.6.1" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// Scala version\n", | |
"sc.version" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"//sql context\n", | |
"import org.apache.spark.sql.SQLContext\n", | |
"import org.apache.spark.sql.functions._\n", | |
"\n", | |
"val sqlContext = new SQLContext(sc)\n", | |
"import sqlContext.implicits._" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// spark-cassandra connector\n", | |
"import com.datastax.spark.connector._\n", | |
"import com.datastax.spark.connector.cql._\n", | |
"\n", | |
"import org.apache.spark.sql.cassandra.CassandraSQLContext\n", | |
"val cc = new CassandraSQLContext(sc)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"val events = cc.sql(\"\"\"select ts, uid, lat, lon, vid from lbsn.events where\n", | |
" lon>-74.2589 and lon<-73.7004 and \n", | |
" lat> 40.4774 and lat< 40.9176\n", | |
" \"\"\").as(\"events\")\n", | |
"\n", | |
"val venues = cc.sql(\"select vid, name from lbsn.venues\").as(\"venues\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"138948" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"events.count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[2009-12-17 14:58:23.0,83942,40.7586191119,-73.9888966084,225371]" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"events.first()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"30366" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"venues.count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[1073929,Sputnik Gallery]" | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"venues.first()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"27" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// User 0: how many checkins?\n", | |
"events.where(\"uid=0\").count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[7239827,Central Park Manhattan ]" | |
] | |
}, | |
"execution_count": 10, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"venues.where(\"vid = 7239827\").first()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-------+--------------------+\n", | |
"|vid |name |\n", | |
"+-------+--------------------+\n", | |
"|1073929|Sputnik Gallery |\n", | |
"|1350193|Kaori's Closet Tokyo|\n", | |
"|1425555|Club Spain |\n", | |
"|7160165|La Quinta - Brooklyn|\n", | |
"|285750 |La Sorrentina |\n", | |
"+-------+--------------------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"venues.show(5, false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---------------------+------+-------------+--------------+-------+--------------------+\n", | |
"|ts |uid |lat |lon |vid |name |\n", | |
"+---------------------+------+-------------+--------------+-------+--------------------+\n", | |
"|2009-12-17 14:58:23.0|83942 |40.7586191119|-73.9888966084|225371 |Smith's Bar & Grill |\n", | |
"|2010-08-18 03:08:36.0|166583|40.697517133 |-74.175481333 |1276055|Jake's Coffeehouse |\n", | |
"|2010-09-14 14:15:22.0|41568 |40.7538465143|-73.9845085144|1308099|The Southwest Porch |\n", | |
"|2009-12-04 16:24:03.0|74122 |40.721437025 |-73.9978015423|15998 |La Esquina |\n", | |
"|2009-12-04 16:23:01.0|74122 |40.7211204821|-73.9982306578|32674 |Blip.tv Headquarters|\n", | |
"+---------------------+------+-------------+--------------+-------+--------------------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val df = events.\n", | |
" join(venues, events(\"events.vid\") === venues(\"venues.vid\"), \"inner\").\n", | |
" select(\"ts\", \"uid\", \"lat\", \"lon\", \"events.vid\",\"venues.name\")\n", | |
"\n", | |
"df.show(5,false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"112382" | |
] | |
}, | |
"execution_count": 13, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// filter new york\n", | |
"val df_ny = df.filter($\"lon\" > -74.2589 and $\"lon\" < -73.7004 and $\"lat\" > 40.4774 and $\"lat\" < 40.9176)\n", | |
"df_ny.count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// UDF functions for SQL-like operations on columns\n", | |
"import org.joda.time.DateTime\n", | |
"import org.joda.time.DateTimeZone\n", | |
"\n", | |
"import java.sql.Timestamp\n", | |
"import org.apache.spark.sql.functions.udf\n", | |
"\n", | |
"val dayofweek = udf( (ts: Timestamp, tz: String) => {\n", | |
" val dt = new DateTime(ts,DateTimeZone.forID(tz))\n", | |
" // sunday starts at 0\n", | |
" dt.getDayOfWeek() - 1\n", | |
"})\n", | |
"\n", | |
"val localhour = udf( (ts: Timestamp, tz: String) => {\n", | |
" val dt = new DateTime(ts,DateTimeZone.forID(tz))\n", | |
" dt.getHourOfDay()\n", | |
"})" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n", | |
"|ts |uid |lat |lon |vid |name |dow|hour|\n", | |
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n", | |
"|2009-12-17 14:58:23.0|83942 |40.7586191119|-73.9888966084|225371 |Smith's Bar & Grill |3 |17 |\n", | |
"|2010-08-18 03:08:36.0|166583|40.697517133 |-74.175481333 |1276055|Jake's Coffeehouse |2 |6 |\n", | |
"|2010-09-14 14:15:22.0|41568 |40.7538465143|-73.9845085144|1308099|The Southwest Porch |1 |17 |\n", | |
"|2009-12-04 16:24:03.0|74122 |40.721437025 |-73.9978015423|15998 |La Esquina |4 |19 |\n", | |
"|2009-12-04 16:23:01.0|74122 |40.7211204821|-73.9982306578|32674 |Blip.tv Headquarters|4 |19 |\n", | |
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val newyork_tz = \"America/New_York\"\n", | |
"\n", | |
"val df = df_ny.\n", | |
" withColumn(\"dow\", dayofweek($\"ts\", lit(newyork_tz))).\n", | |
" withColumn(\"hour\", localhour($\"ts\", lit(newyork_tz))).\n", | |
" as(\"events\").cache()\n", | |
" \n", | |
"df.show(5, false)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---+-----+\n", | |
"|dow|count|\n", | |
"+---+-----+\n", | |
"| 0|14640|\n", | |
"| 1|15385|\n", | |
"| 2|15686|\n", | |
"| 3|16179|\n", | |
"| 4|16558|\n", | |
"| 5|18085|\n", | |
"| 6|15849|\n", | |
"+---+-----+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"// histogram day of the week events\n", | |
"df.groupBy($\"dow\").count().show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+----+-----+\n", | |
"|hour|count|\n", | |
"+----+-----+\n", | |
"|0 |2681 |\n", | |
"|1 |1272 |\n", | |
"|2 |852 |\n", | |
"|3 |554 |\n", | |
"|4 |323 |\n", | |
"|5 |550 |\n", | |
"|6 |937 |\n", | |
"|7 |2157 |\n", | |
"|8 |3615 |\n", | |
"|9 |4586 |\n", | |
"|10 |4903 |\n", | |
"|11 |5576 |\n", | |
"|12 |7257 |\n", | |
"|13 |8142 |\n", | |
"|14 |7687 |\n", | |
"|15 |7254 |\n", | |
"|16 |7519 |\n", | |
"|17 |7495 |\n", | |
"|18 |8621 |\n", | |
"|19 |8970 |\n", | |
"|20 |7167 |\n", | |
"|21 |6127 |\n", | |
"|22 |4747 |\n", | |
"|23 |3390 |\n", | |
"+----+-----+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"// histogram hour of the day events\n", | |
"df.groupBy($\"hour\").count().show(24,false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import breeze.linalg._\n", | |
"import breeze.linalg.DenseVector\n", | |
"\n", | |
"import org.apache.spark.mllib.linalg.{Vector,Vectors}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// vector histogram: the RDD way\n", | |
"\n", | |
"def toVector(i: Int, length:Int) = {\n", | |
" DenseVector((0 to length-1).map(x => if (x == i) 1.0 else 0.0).toArray)\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"(225371,DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0))" | |
] | |
}, | |
"execution_count": 20, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"df.select($\"vid\", $\"dow\").map(r => (r.getLong(0),toVector(r.getInt(1), 7))).first()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 21, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"val dow_hist = df.\n", | |
" select($\"vid\", $\"dow\").\n", | |
" map(r => (r.getLong(0),toVector(r.getInt(1), 7))).\n", | |
" reduceByKey(_ + _).\n", | |
" mapValues(x => Vectors.dense((x / sum(x)).toArray)).\n", | |
" toDF(\"vid\", \"dow_hist\")\n", | |
"\n", | |
" " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 22, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\r", | |
" \r", | |
"+-------+-------------------------------+\n", | |
"|vid |dow_hist |\n", | |
"+-------+-------------------------------+\n", | |
"|713995 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n", | |
"|4415530|[0.0,0.0,0.0,0.0,0.0,1.0,0.0] |\n", | |
"|486265 |[0.25,0.0,0.0,0.0,0.0,0.5,0.25]|\n", | |
"+-------+-------------------------------+\n", | |
"only showing top 3 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"dow_hist.show(3, false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 23, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n", | |
"|ts |uid |lat |lon |vid |dow|dow_hist |\n", | |
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n", | |
"|2010-03-15 04:45:53.0|155999|40.7131656401|-74.0353098807|713995 |0 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n", | |
"|2010-06-19 13:08:21.0|41235 |40.7131656401|-74.0353098807|713995 |5 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n", | |
"|2010-04-23 20:47:34.0|10687 |40.7131656401|-74.0353098807|713995 |4 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n", | |
"|2010-10-08 04:53:51.0|74222 |40.7131656401|-74.0353098807|713995 |4 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n", | |
"|2010-09-25 10:02:38.0|181811|40.76603735 |-73.78952235 |4415530|5 |[0.0,0.0,0.0,0.0,0.0,1.0,0.0] |\n", | |
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val df_probs = df.\n", | |
" join(dow_hist, df(\"vid\") === dow_hist(\"vid\"), \"inner\").\n", | |
" select(\"ts\", \"uid\", \"lat\", \"lon\", \"events.vid\", \"dow\", \"dow_hist\").\n", | |
" cache()\n", | |
"\n", | |
"df_probs.show(5,false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 24, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---------------------+------+------+---+--------+\n", | |
"|ts |uid |vid |dow|dow_prob|\n", | |
"+---------------------+------+------+---+--------+\n", | |
"|2010-03-15 04:45:53.0|155999|713995|0 |0.25 |\n", | |
"|2010-06-19 13:08:21.0|41235 |713995|5 |0.25 |\n", | |
"|2010-04-23 20:47:34.0|10687 |713995|4 |0.5 |\n", | |
"+---------------------+------+------+---+--------+\n", | |
"only showing top 3 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val nth = udf( (i:Int, arr: Vector) => {\n", | |
" arr.toArray.lift(i).getOrElse(0.0)\n", | |
"})\n", | |
"\n", | |
"df_probs.select($\"ts\", $\"uid\", $\"vid\", $\"dow\", nth($\"dow\", $\"dow_hist\").as(\"dow_prob\")).show(3,false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 25, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-------+---------------------------------------------------------------------------------------------------+\n", | |
"|vid |hour_hist |\n", | |
"+-------+---------------------------------------------------------------------------------------------------+\n", | |
"|713995 |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.25,0.0,0.0,0.0,0.0,0.0,0.0,0.25]|\n", | |
"|4415530|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] |\n", | |
"|486265 |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.25,0.0,0.25,0.0,0.0,0.0,0.0]|\n", | |
"+-------+---------------------------------------------------------------------------------------------------+\n", | |
"only showing top 3 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"// same for hour of the day\n", | |
"\n", | |
"val hour_hist = df.\n", | |
" select($\"vid\", $\"hour\").\n", | |
" map(r => (r.getLong(0),toVector(r.getInt(1), 24))).\n", | |
" reduceByKey(_ + _).\n", | |
" mapValues(x => Vectors.dense((x / sum(x)).toArray)).\n", | |
" toDF(\"vid\", \"hour_hist\")\n", | |
"\n", | |
"hour_hist.show(3, false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 26, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\r", | |
" \r", | |
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n", | |
"|ts |uid |lat |lon |vid |hour_prob |dow_prob |\n", | |
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n", | |
"|2010-08-13 23:28:29.0|125327|40.7490532543|-73.9680397511|11831|0.015873015873015872|0.19047619047619047|\n", | |
"|2010-09-29 12:08:36.0|578 |40.7490532543|-73.9680397511|11831|0.12698412698412698 |0.2222222222222222 |\n", | |
"|2010-09-05 13:20:27.0|578 |40.7490532543|-73.9680397511|11831|0.047619047619047616|0.12698412698412698|\n", | |
"|2010-07-21 07:02:07.0|578 |40.7490532543|-73.9680397511|11831|0.07936507936507936 |0.2222222222222222 |\n", | |
"|2010-07-20 14:24:06.0|578 |40.7490532543|-73.9680397511|11831|0.07936507936507936 |0.14285714285714285|\n", | |
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val df_probs = df.\n", | |
" join(dow_hist, df(\"vid\") === dow_hist(\"vid\"), \"inner\").\n", | |
" join(hour_hist, df(\"vid\") === hour_hist(\"vid\"), \"inner\").\n", | |
" select( \n", | |
" $\"ts\", \n", | |
" $\"uid\", \n", | |
" $\"lat\", \n", | |
" $\"lon\", \n", | |
" $\"events.vid\", \n", | |
" nth($\"hour\", $\"hour_hist\").as(\"hour_prob\"), \n", | |
" nth($\"dow\", $\"dow_hist\").as(\"dow_prob\")).\n", | |
" cache()\n", | |
"\n", | |
"df_probs.show(5,false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 27, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// process mining\n", | |
"val g_df = events.\n", | |
" select($\"ts\", $\"uid\", $\"vid\").\n", | |
" rdd.\n", | |
" map(row => (row.getLong(1), List( (row.getTimestamp(0), row.getLong(2)) ))).\n", | |
" reduceByKey(_ ++ _).\n", | |
" mapValues( x =>\n", | |
" x.sortWith(_._1.getTime < _._1.getTime).\n", | |
" map(_._2)\n", | |
" ).\n", | |
" mapValues(_.sliding(2).toList).\n", | |
" flatMap(_._2).\n", | |
" map(\n", | |
" _ match {\n", | |
" case List(a, b) => Some((a, b))\n", | |
" case _ => None\n", | |
" }).\n", | |
" flatMap(x => x).\n", | |
" toDF(\"src\", \"dst\").\n", | |
" cache()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 28, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"\r", | |
" \r", | |
"+------+------+\n", | |
"| src| dst|\n", | |
"+------+------+\n", | |
"|255148|603177|\n", | |
"|603177|603177|\n", | |
"|603177|603177|\n", | |
"|603177|603177|\n", | |
"|603177|188022|\n", | |
"+------+------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"g_df.show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 39, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+------+------+------+\n", | |
"| src| dst|weight|\n", | |
"+------+------+------+\n", | |
"| 12505| 12505| 240.0|\n", | |
"| 13022| 13022| 188.0|\n", | |
"| 23261| 23261| 163.0|\n", | |
"| 24963| 24963| 154.0|\n", | |
"|400740|400740| 84.0|\n", | |
"+------+------+------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"//.withColumn(\"weight\", $\"count\".cast(\"double\")).drop(\"count\").filter($\"weight\" > 1.0)\n", | |
"val edges_df = g_df.groupBy($\"src\",$\"dst\").count().withColumn(\"weight\", $\"count\".cast(\"double\")).drop(\"count\").cache()\n", | |
"edges_df.orderBy(desc(\"weight\")).show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 40, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"110235" | |
] | |
}, | |
"execution_count": 40, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"edges_df.count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 41, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"21438" | |
] | |
}, | |
"execution_count": 41, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"val nodes_df = edges_df.select($\"src\").distinct().unionAll(edges_df.select($\"dst\").distinct()).distinct().toDF(\"id\").cache()\n", | |
"nodes_df.count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 42, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import org.graphframes.GraphFrame\n", | |
"\n", | |
"val g = GraphFrame(nodes_df, edges_df)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 63, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"val results = g.pageRank.resetProbability(0.05).maxIter(5).run()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 64, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+------+------------------+---------------------------------+\n", | |
"|vid |pagerank |name |\n", | |
"+------+------------------+---------------------------------+\n", | |
"|12505 |27.5777391608751 |LGA LaGuardia Airport |\n", | |
"|23261 |26.066573954316933|JFK John F. Kennedy International|\n", | |
"|11844 |21.850881240333695|Times Square |\n", | |
"|13022 |17.556352719881815|Grand Central Terminal |\n", | |
"|24963 |16.261458952885967|EWR Newark Liberty International |\n", | |
"|11875 |10.109985490864041|Madison Square Garden |\n", | |
"|12525 |10.074957734472436|The Museum of Modern Art (MoMA) |\n", | |
"|11720 |9.90862194915849 |Yankee Stadium |\n", | |
"|106840|9.685021905149734 |Union Square |\n", | |
"|11834 |8.874649776756282 |Bryant Park |\n", | |
"|12313 |8.087135577458076 |Empire State Building |\n", | |
"|14151 |6.953063528162026 |Rockefeller Center |\n", | |
"|17710 |6.917629638959865 |Madison Square Park |\n", | |
"+------+------------------+---------------------------------+\n", | |
"only showing top 13 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val vertices = results.vertices.select(\"id\", \"pagerank\")\n", | |
"val popular_venues = vertices.join(venues, vertices(\"id\") === venues(\"vid\"), \"inner\").select(\"vid\", \"pagerank\", \"name\")\n", | |
"\n", | |
"popular_venues.sort($\"pagerank\".desc).show(13, false)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 97, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"val e_df = events.\n", | |
" select(\"uid\",\"lat\",\"lon\").\n", | |
" rdd.map(row => (row.getLong(0), Array(row.getDouble(1), row.getDouble(2))) ).\n", | |
" reduceByKey( _ ++ _).\n", | |
" mapValues(v => new DenseMatrix(v.length/2,2,v, 0, 2, true))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 118, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"def formatUserEvents(x: Tuple2[Long, DenseMatrix[Double]]) : Unit = {\n", | |
" val arr = x._2\n", | |
" val slice = arr(0 to min(5,arr.rows)-1, ::)\n", | |
" println(s\"uid = ${x._1}\")\n", | |
" println(s\"events count = ${arr.rows}\")\n", | |
" println(\"lat,lon = \")\n", | |
" println(slice)\n", | |
" if (arr.rows > 5) println(s\"... ${arr.rows- 5} more rows\")\n", | |
" println(\"-\"*30)\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 120, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"uid = 33590\n", | |
"events count = 355\n", | |
"lat,lon = \n", | |
"40.735486752 -73.979792352 \n", | |
"40.73723724 -73.980824105 \n", | |
"40.7424578176 -73.9837482386 \n", | |
"40.70736247 -74.008827565 \n", | |
"40.737524907 -73.978869923 \n", | |
"... 350 more rows\n", | |
"------------------------------\n", | |
"uid = 139605\n", | |
"events count = 4\n", | |
"lat,lon = \n", | |
"40.7126315333 -74.0444852833 \n", | |
"40.6993813387 -74.0393972397 \n", | |
"40.69856285 -74.03974055 \n", | |
"40.6987988833 -74.0400195 \n", | |
"------------------------------\n", | |
"uid = 108050\n", | |
"events count = 1\n", | |
"lat,lon = \n", | |
"40.7425115937 -74.0060305595 \n", | |
"------------------------------\n" | |
] | |
} | |
], | |
"source": [ | |
"e_df.take(3).foreach(e => formatUserEvents(e))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 124, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import breeze.numerics._\n", | |
"import breeze.linalg._\n", | |
"\n", | |
"def euclideanDistance (a: DenseVector[Double], b: DenseVector[Double]) = norm(a-b, 2)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 134, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import nak.cluster._\n", | |
"import nak.cluster.GDBSCAN._\n", | |
"\n", | |
"def dbscan(v : breeze.linalg.DenseMatrix[Double]) = {\n", | |
"\n", | |
" val gdbscan = new GDBSCAN(\n", | |
" DBSCAN.getNeighbours(epsilon = 0.1, distance=euclideanDistance),\n", | |
" DBSCAN.isCorePoint(minPoints = 3)\n", | |
" )\n", | |
"\n", | |
" val clusters = gdbscan cluster v\n", | |
" \n", | |
" // reducing the clusters to bounding boxes\n", | |
" // for simplicity: each user could \n", | |
" clusters.map(\n", | |
" cluster => (\n", | |
" cluster.id.toInt, \n", | |
" cluster.points.size, \n", | |
" cluster.points.map(_.value(0)).min,\n", | |
" cluster.points.map(_.value(1)).min,\n", | |
" cluster.points.map(_.value(0)).max,\n", | |
" cluster.points.map(_.value(1)).max\n", | |
" )\n", | |
" )\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 136, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"val bboxRdd = e_df.mapValues(dbscan(_))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 150, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+---+-----+-------------+--------------+-------------+--------------+\n", | |
"| uid|cid|csize| lat_min| lon_min| lat_max| lon_max|\n", | |
"+-----+---+-----+-------------+--------------+-------------+--------------+\n", | |
"|33590| 66| 6| 40.837230222| -73.88030725|40.8538124533| -73.782456383|\n", | |
"|76360| 51| 2|40.8117304816|-74.0675711632|40.8117304816|-74.0675711632|\n", | |
"| 4890| 2| 85|40.7008291704|-74.0347717925| 40.792863965|-73.9727404494|\n", | |
"| 4890| 85| 3|40.6891968241|-74.0472292833|40.6993813387|-74.0393972397|\n", | |
"|32640| 1| 12|40.6394147833|-74.0765082836|40.7058475346| -73.98080408|\n", | |
"|32640| 2| 12|40.7113103667| -74.0023141|40.7317873406| -73.94982115|\n", | |
"| 6220| 1| 65| 40.70358305| -74.009206295|40.7636900022|-73.9719665051|\n", | |
"| 6220| 56| 2| 40.85011295|-73.9520859718|40.8514833821|-73.9468287333|\n", | |
"|10100| 1| 59|40.7190166905| -74.008749967|40.7805414319| -73.964475333|\n", | |
"| 4605| 2| 4|40.7208271761|-73.9988958836| 40.752604|-73.9944867037|\n", | |
"+-----+---+-----+-------------+--------------+-------------+--------------+\n", | |
"only showing top 10 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"val bbox_df = bboxRdd.\n", | |
" flatMapValues(x => x).\n", | |
" map(x => (x._1, x._2._1, x._2._2,x._2._3,x._2._4,x._2._5,x._2._6)).\n", | |
" toDF(\"uid\", \"cid\", \"csize\", \"lat_min\", \"lon_min\", \"lat_max\", \"lon_max\").\n", | |
" filter($\"cid\" > 0)\n", | |
"\n", | |
"bbox_df.show(10)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"pagerank\n", | |
"popularity\n" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Apache Toree - Scala", | |
"language": "scala", | |
"name": "apache_toree_scala" | |
}, | |
"language_info": { | |
"name": "scala", | |
"version": "2.10.4" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment