Skip to content

Instantly share code, notes, and snippets.

@natbusa
Created August 1, 2016 18:26
Show Gist options
  • Save natbusa/96f2ad39656f639e23cfb464ff8cc309 to your computer and use it in GitHub Desktop.
Save natbusa/96f2ad39656f639e23cfb464ff8cc309 to your computer and use it in GitHub Desktop.
Oriole - Anomaly detection and pattern extraction with Spark, Cassandra and Scala
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 126,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting download from https://github.com/natalinobusa/nak/raw/master/nak_2.10-1.3.jar\n",
"Finished download of nak_2.10-1.3.jar\n"
]
}
],
"source": [
"%addjar https://github.com/natalinobusa/nak/raw/master/nak_2.10-1.3.jar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setup\n",
"\n",
"This notebook is running scala code and iterfaces to a Spark cluster using the [Apache Toree](https://toree.incubator.apache.org/) project. Furthermore this notebook shows how to extract pattern from data stored in Cassandra and how to write back model and results in Cassandra tables. Spark interfaces to Cassandra via the [Cassandra-Spark connector](https://github.com/datastax/spark-cassandra-connector).\n",
"\n",
"At the time of compiling this notebook, Spark 1.6.1 and Cassandra 3.5 were used.\n",
"Here below the command to install the Spark - Scala Kernel on Jupiter. More instructions on this topic are available on Apache Toree [website](https://toree.incubator.apache.org/) and [github pages](https://github.com/apache/incubator-toree)\n",
"\n",
"```shell\n",
"sudo /opt/local/Library/Frameworks/Python.framework/Versions/3.5/bin/jupyter-toree install --spark_home=${SPARK_HOME} --spark_opts='--packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.0,graphframes:graphframes:0.1.0-spark1.6 --conf spark.cassandra.connection.host=localhost --conf spark.executor.memory=6g --conf spark.driver.memory=6g'\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"1.6.1"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Scala version\n",
"sc.version"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"//sql context\n",
"import org.apache.spark.sql.SQLContext\n",
"import org.apache.spark.sql.functions._\n",
"\n",
"val sqlContext = new SQLContext(sc)\n",
"import sqlContext.implicits._"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// spark-cassandra connector\n",
"import com.datastax.spark.connector._\n",
"import com.datastax.spark.connector.cql._\n",
"\n",
"import org.apache.spark.sql.cassandra.CassandraSQLContext\n",
"val cc = new CassandraSQLContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"val events = cc.sql(\"\"\"select ts, uid, lat, lon, vid from lbsn.events where\n",
" lon>-74.2589 and lon<-73.7004 and \n",
" lat> 40.4774 and lat< 40.9176\n",
" \"\"\").as(\"events\")\n",
"\n",
"val venues = cc.sql(\"select vid, name from lbsn.venues\").as(\"venues\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"138948"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"events.count()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[2009-12-17 14:58:23.0,83942,40.7586191119,-73.9888966084,225371]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"events.first()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"30366"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"venues.count()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[1073929,Sputnik Gallery]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"venues.first()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"27"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// User 0: how many checkins?\n",
"events.where(\"uid=0\").count()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[7239827,Central Park Manhattan ]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"venues.where(\"vid = 7239827\").first()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+\n",
"|vid |name |\n",
"+-------+--------------------+\n",
"|1073929|Sputnik Gallery |\n",
"|1350193|Kaori's Closet Tokyo|\n",
"|1425555|Club Spain |\n",
"|7160165|La Quinta - Brooklyn|\n",
"|285750 |La Sorrentina |\n",
"+-------+--------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"venues.show(5, false)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------------+------+-------------+--------------+-------+--------------------+\n",
"|ts |uid |lat |lon |vid |name |\n",
"+---------------------+------+-------------+--------------+-------+--------------------+\n",
"|2009-12-17 14:58:23.0|83942 |40.7586191119|-73.9888966084|225371 |Smith's Bar & Grill |\n",
"|2010-08-18 03:08:36.0|166583|40.697517133 |-74.175481333 |1276055|Jake's Coffeehouse |\n",
"|2010-09-14 14:15:22.0|41568 |40.7538465143|-73.9845085144|1308099|The Southwest Porch |\n",
"|2009-12-04 16:24:03.0|74122 |40.721437025 |-73.9978015423|15998 |La Esquina |\n",
"|2009-12-04 16:23:01.0|74122 |40.7211204821|-73.9982306578|32674 |Blip.tv Headquarters|\n",
"+---------------------+------+-------------+--------------+-------+--------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val df = events.\n",
" join(venues, events(\"events.vid\") === venues(\"venues.vid\"), \"inner\").\n",
" select(\"ts\", \"uid\", \"lat\", \"lon\", \"events.vid\",\"venues.name\")\n",
"\n",
"df.show(5,false)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"112382"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// filter new york\n",
"val df_ny = df.filter($\"lon\" > -74.2589 and $\"lon\" < -73.7004 and $\"lat\" > 40.4774 and $\"lat\" < 40.9176)\n",
"df_ny.count()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// UDF functions for SQL-like operations on columns\n",
"import org.joda.time.DateTime\n",
"import org.joda.time.DateTimeZone\n",
"\n",
"import java.sql.Timestamp\n",
"import org.apache.spark.sql.functions.udf\n",
"\n",
"val dayofweek = udf( (ts: Timestamp, tz: String) => {\n",
" val dt = new DateTime(ts,DateTimeZone.forID(tz))\n",
" // sunday starts at 0\n",
" dt.getDayOfWeek() - 1\n",
"})\n",
"\n",
"val localhour = udf( (ts: Timestamp, tz: String) => {\n",
" val dt = new DateTime(ts,DateTimeZone.forID(tz))\n",
" dt.getHourOfDay()\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n",
"|ts |uid |lat |lon |vid |name |dow|hour|\n",
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n",
"|2009-12-17 14:58:23.0|83942 |40.7586191119|-73.9888966084|225371 |Smith's Bar & Grill |3 |17 |\n",
"|2010-08-18 03:08:36.0|166583|40.697517133 |-74.175481333 |1276055|Jake's Coffeehouse |2 |6 |\n",
"|2010-09-14 14:15:22.0|41568 |40.7538465143|-73.9845085144|1308099|The Southwest Porch |1 |17 |\n",
"|2009-12-04 16:24:03.0|74122 |40.721437025 |-73.9978015423|15998 |La Esquina |4 |19 |\n",
"|2009-12-04 16:23:01.0|74122 |40.7211204821|-73.9982306578|32674 |Blip.tv Headquarters|4 |19 |\n",
"+---------------------+------+-------------+--------------+-------+--------------------+---+----+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val newyork_tz = \"America/New_York\"\n",
"\n",
"val df = df_ny.\n",
" withColumn(\"dow\", dayofweek($\"ts\", lit(newyork_tz))).\n",
" withColumn(\"hour\", localhour($\"ts\", lit(newyork_tz))).\n",
" as(\"events\").cache()\n",
" \n",
"df.show(5, false)\n"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-----+\n",
"|dow|count|\n",
"+---+-----+\n",
"| 0|14640|\n",
"| 1|15385|\n",
"| 2|15686|\n",
"| 3|16179|\n",
"| 4|16558|\n",
"| 5|18085|\n",
"| 6|15849|\n",
"+---+-----+\n",
"\n"
]
}
],
"source": [
"// histogram day of the week events\n",
"df.groupBy($\"dow\").count().show()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+-----+\n",
"|hour|count|\n",
"+----+-----+\n",
"|0 |2681 |\n",
"|1 |1272 |\n",
"|2 |852 |\n",
"|3 |554 |\n",
"|4 |323 |\n",
"|5 |550 |\n",
"|6 |937 |\n",
"|7 |2157 |\n",
"|8 |3615 |\n",
"|9 |4586 |\n",
"|10 |4903 |\n",
"|11 |5576 |\n",
"|12 |7257 |\n",
"|13 |8142 |\n",
"|14 |7687 |\n",
"|15 |7254 |\n",
"|16 |7519 |\n",
"|17 |7495 |\n",
"|18 |8621 |\n",
"|19 |8970 |\n",
"|20 |7167 |\n",
"|21 |6127 |\n",
"|22 |4747 |\n",
"|23 |3390 |\n",
"+----+-----+\n",
"\n"
]
}
],
"source": [
"// histogram hour of the day events\n",
"df.groupBy($\"hour\").count().show(24,false)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import breeze.linalg._\n",
"import breeze.linalg.DenseVector\n",
"\n",
"import org.apache.spark.mllib.linalg.{Vector,Vectors}"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// vector histogram: the RDD way\n",
"\n",
"def toVector(i: Int, length:Int) = {\n",
" DenseVector((0 to length-1).map(x => if (x == i) 1.0 else 0.0).toArray)\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(225371,DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0))"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select($\"vid\", $\"dow\").map(r => (r.getLong(0),toVector(r.getInt(1), 7))).first()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"val dow_hist = df.\n",
" select($\"vid\", $\"dow\").\n",
" map(r => (r.getLong(0),toVector(r.getInt(1), 7))).\n",
" reduceByKey(_ + _).\n",
" mapValues(x => Vectors.dense((x / sum(x)).toArray)).\n",
" toDF(\"vid\", \"dow_hist\")\n",
"\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" \r",
"+-------+-------------------------------+\n",
"|vid |dow_hist |\n",
"+-------+-------------------------------+\n",
"|713995 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n",
"|4415530|[0.0,0.0,0.0,0.0,0.0,1.0,0.0] |\n",
"|486265 |[0.25,0.0,0.0,0.0,0.0,0.5,0.25]|\n",
"+-------+-------------------------------+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"dow_hist.show(3, false)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n",
"|ts |uid |lat |lon |vid |dow|dow_hist |\n",
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n",
"|2010-03-15 04:45:53.0|155999|40.7131656401|-74.0353098807|713995 |0 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n",
"|2010-06-19 13:08:21.0|41235 |40.7131656401|-74.0353098807|713995 |5 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n",
"|2010-04-23 20:47:34.0|10687 |40.7131656401|-74.0353098807|713995 |4 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n",
"|2010-10-08 04:53:51.0|74222 |40.7131656401|-74.0353098807|713995 |4 |[0.25,0.0,0.0,0.0,0.5,0.25,0.0]|\n",
"|2010-09-25 10:02:38.0|181811|40.76603735 |-73.78952235 |4415530|5 |[0.0,0.0,0.0,0.0,0.0,1.0,0.0] |\n",
"+---------------------+------+-------------+--------------+-------+---+-------------------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val df_probs = df.\n",
" join(dow_hist, df(\"vid\") === dow_hist(\"vid\"), \"inner\").\n",
" select(\"ts\", \"uid\", \"lat\", \"lon\", \"events.vid\", \"dow\", \"dow_hist\").\n",
" cache()\n",
"\n",
"df_probs.show(5,false)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------------+------+------+---+--------+\n",
"|ts |uid |vid |dow|dow_prob|\n",
"+---------------------+------+------+---+--------+\n",
"|2010-03-15 04:45:53.0|155999|713995|0 |0.25 |\n",
"|2010-06-19 13:08:21.0|41235 |713995|5 |0.25 |\n",
"|2010-04-23 20:47:34.0|10687 |713995|4 |0.5 |\n",
"+---------------------+------+------+---+--------+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"val nth = udf( (i:Int, arr: Vector) => {\n",
" arr.toArray.lift(i).getOrElse(0.0)\n",
"})\n",
"\n",
"df_probs.select($\"ts\", $\"uid\", $\"vid\", $\"dow\", nth($\"dow\", $\"dow_hist\").as(\"dow_prob\")).show(3,false)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+---------------------------------------------------------------------------------------------------+\n",
"|vid |hour_hist |\n",
"+-------+---------------------------------------------------------------------------------------------------+\n",
"|713995 |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.25,0.0,0.0,0.0,0.0,0.0,0.0,0.25]|\n",
"|4415530|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] |\n",
"|486265 |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.25,0.0,0.25,0.0,0.0,0.0,0.0]|\n",
"+-------+---------------------------------------------------------------------------------------------------+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"// same for hour of the day\n",
"\n",
"val hour_hist = df.\n",
" select($\"vid\", $\"hour\").\n",
" map(r => (r.getLong(0),toVector(r.getInt(1), 24))).\n",
" reduceByKey(_ + _).\n",
" mapValues(x => Vectors.dense((x / sum(x)).toArray)).\n",
" toDF(\"vid\", \"hour_hist\")\n",
"\n",
"hour_hist.show(3, false)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" \r",
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n",
"|ts |uid |lat |lon |vid |hour_prob |dow_prob |\n",
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n",
"|2010-08-13 23:28:29.0|125327|40.7490532543|-73.9680397511|11831|0.015873015873015872|0.19047619047619047|\n",
"|2010-09-29 12:08:36.0|578 |40.7490532543|-73.9680397511|11831|0.12698412698412698 |0.2222222222222222 |\n",
"|2010-09-05 13:20:27.0|578 |40.7490532543|-73.9680397511|11831|0.047619047619047616|0.12698412698412698|\n",
"|2010-07-21 07:02:07.0|578 |40.7490532543|-73.9680397511|11831|0.07936507936507936 |0.2222222222222222 |\n",
"|2010-07-20 14:24:06.0|578 |40.7490532543|-73.9680397511|11831|0.07936507936507936 |0.14285714285714285|\n",
"+---------------------+------+-------------+--------------+-----+--------------------+-------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val df_probs = df.\n",
" join(dow_hist, df(\"vid\") === dow_hist(\"vid\"), \"inner\").\n",
" join(hour_hist, df(\"vid\") === hour_hist(\"vid\"), \"inner\").\n",
" select( \n",
" $\"ts\", \n",
" $\"uid\", \n",
" $\"lat\", \n",
" $\"lon\", \n",
" $\"events.vid\", \n",
" nth($\"hour\", $\"hour_hist\").as(\"hour_prob\"), \n",
" nth($\"dow\", $\"dow_hist\").as(\"dow_prob\")).\n",
" cache()\n",
"\n",
"df_probs.show(5,false)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// process mining\n",
"val g_df = events.\n",
" select($\"ts\", $\"uid\", $\"vid\").\n",
" rdd.\n",
" map(row => (row.getLong(1), List( (row.getTimestamp(0), row.getLong(2)) ))).\n",
" reduceByKey(_ ++ _).\n",
" mapValues( x =>\n",
" x.sortWith(_._1.getTime < _._1.getTime).\n",
" map(_._2)\n",
" ).\n",
" mapValues(_.sliding(2).toList).\n",
" flatMap(_._2).\n",
" map(\n",
" _ match {\n",
" case List(a, b) => Some((a, b))\n",
" case _ => None\n",
" }).\n",
" flatMap(x => x).\n",
" toDF(\"src\", \"dst\").\n",
" cache()"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" \r",
"+------+------+\n",
"| src| dst|\n",
"+------+------+\n",
"|255148|603177|\n",
"|603177|603177|\n",
"|603177|603177|\n",
"|603177|603177|\n",
"|603177|188022|\n",
"+------+------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"g_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------+------+------+\n",
"| src| dst|weight|\n",
"+------+------+------+\n",
"| 12505| 12505| 240.0|\n",
"| 13022| 13022| 188.0|\n",
"| 23261| 23261| 163.0|\n",
"| 24963| 24963| 154.0|\n",
"|400740|400740| 84.0|\n",
"+------+------+------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"//.withColumn(\"weight\", $\"count\".cast(\"double\")).drop(\"count\").filter($\"weight\" > 1.0)\n",
"val edges_df = g_df.groupBy($\"src\",$\"dst\").count().withColumn(\"weight\", $\"count\".cast(\"double\")).drop(\"count\").cache()\n",
"edges_df.orderBy(desc(\"weight\")).show(5)"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"110235"
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"edges_df.count()"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"21438"
]
},
"execution_count": 41,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val nodes_df = edges_df.select($\"src\").distinct().unionAll(edges_df.select($\"dst\").distinct()).distinct().toDF(\"id\").cache()\n",
"nodes_df.count()"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import org.graphframes.GraphFrame\n",
"\n",
"val g = GraphFrame(nodes_df, edges_df)"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"val results = g.pageRank.resetProbability(0.05).maxIter(5).run()"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------+------------------+---------------------------------+\n",
"|vid |pagerank |name |\n",
"+------+------------------+---------------------------------+\n",
"|12505 |27.5777391608751 |LGA LaGuardia Airport |\n",
"|23261 |26.066573954316933|JFK John F. Kennedy International|\n",
"|11844 |21.850881240333695|Times Square |\n",
"|13022 |17.556352719881815|Grand Central Terminal |\n",
"|24963 |16.261458952885967|EWR Newark Liberty International |\n",
"|11875 |10.109985490864041|Madison Square Garden |\n",
"|12525 |10.074957734472436|The Museum of Modern Art (MoMA) |\n",
"|11720 |9.90862194915849 |Yankee Stadium |\n",
"|106840|9.685021905149734 |Union Square |\n",
"|11834 |8.874649776756282 |Bryant Park |\n",
"|12313 |8.087135577458076 |Empire State Building |\n",
"|14151 |6.953063528162026 |Rockefeller Center |\n",
"|17710 |6.917629638959865 |Madison Square Park |\n",
"+------+------------------+---------------------------------+\n",
"only showing top 13 rows\n",
"\n"
]
}
],
"source": [
"val vertices = results.vertices.select(\"id\", \"pagerank\")\n",
"val popular_venues = vertices.join(venues, vertices(\"id\") === venues(\"vid\"), \"inner\").select(\"vid\", \"pagerank\", \"name\")\n",
"\n",
"popular_venues.sort($\"pagerank\".desc).show(13, false)"
]
},
{
"cell_type": "code",
"execution_count": 97,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"val e_df = events.\n",
" select(\"uid\",\"lat\",\"lon\").\n",
" rdd.map(row => (row.getLong(0), Array(row.getDouble(1), row.getDouble(2))) ).\n",
" reduceByKey( _ ++ _).\n",
" mapValues(v => new DenseMatrix(v.length/2,2,v, 0, 2, true))\n"
]
},
{
"cell_type": "code",
"execution_count": 118,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def formatUserEvents(x: Tuple2[Long, DenseMatrix[Double]]) : Unit = {\n",
" val arr = x._2\n",
" val slice = arr(0 to min(5,arr.rows)-1, ::)\n",
" println(s\"uid = ${x._1}\")\n",
" println(s\"events count = ${arr.rows}\")\n",
" println(\"lat,lon = \")\n",
" println(slice)\n",
" if (arr.rows > 5) println(s\"... ${arr.rows- 5} more rows\")\n",
" println(\"-\"*30)\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 120,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"uid = 33590\n",
"events count = 355\n",
"lat,lon = \n",
"40.735486752 -73.979792352 \n",
"40.73723724 -73.980824105 \n",
"40.7424578176 -73.9837482386 \n",
"40.70736247 -74.008827565 \n",
"40.737524907 -73.978869923 \n",
"... 350 more rows\n",
"------------------------------\n",
"uid = 139605\n",
"events count = 4\n",
"lat,lon = \n",
"40.7126315333 -74.0444852833 \n",
"40.6993813387 -74.0393972397 \n",
"40.69856285 -74.03974055 \n",
"40.6987988833 -74.0400195 \n",
"------------------------------\n",
"uid = 108050\n",
"events count = 1\n",
"lat,lon = \n",
"40.7425115937 -74.0060305595 \n",
"------------------------------\n"
]
}
],
"source": [
"e_df.take(3).foreach(e => formatUserEvents(e))"
]
},
{
"cell_type": "code",
"execution_count": 124,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import breeze.numerics._\n",
"import breeze.linalg._\n",
"\n",
"def euclideanDistance (a: DenseVector[Double], b: DenseVector[Double]) = norm(a-b, 2)"
]
},
{
"cell_type": "code",
"execution_count": 134,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import nak.cluster._\n",
"import nak.cluster.GDBSCAN._\n",
"\n",
"def dbscan(v : breeze.linalg.DenseMatrix[Double]) = {\n",
"\n",
" val gdbscan = new GDBSCAN(\n",
" DBSCAN.getNeighbours(epsilon = 0.1, distance=euclideanDistance),\n",
" DBSCAN.isCorePoint(minPoints = 3)\n",
" )\n",
"\n",
" val clusters = gdbscan cluster v\n",
" \n",
" // reducing the clusters to bounding boxes\n",
" // for simplicity: each user could \n",
" clusters.map(\n",
" cluster => (\n",
" cluster.id.toInt, \n",
" cluster.points.size, \n",
" cluster.points.map(_.value(0)).min,\n",
" cluster.points.map(_.value(1)).min,\n",
" cluster.points.map(_.value(0)).max,\n",
" cluster.points.map(_.value(1)).max\n",
" )\n",
" )\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 136,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"val bboxRdd = e_df.mapValues(dbscan(_))"
]
},
{
"cell_type": "code",
"execution_count": 150,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+---+-----+-------------+--------------+-------------+--------------+\n",
"| uid|cid|csize| lat_min| lon_min| lat_max| lon_max|\n",
"+-----+---+-----+-------------+--------------+-------------+--------------+\n",
"|33590| 66| 6| 40.837230222| -73.88030725|40.8538124533| -73.782456383|\n",
"|76360| 51| 2|40.8117304816|-74.0675711632|40.8117304816|-74.0675711632|\n",
"| 4890| 2| 85|40.7008291704|-74.0347717925| 40.792863965|-73.9727404494|\n",
"| 4890| 85| 3|40.6891968241|-74.0472292833|40.6993813387|-74.0393972397|\n",
"|32640| 1| 12|40.6394147833|-74.0765082836|40.7058475346| -73.98080408|\n",
"|32640| 2| 12|40.7113103667| -74.0023141|40.7317873406| -73.94982115|\n",
"| 6220| 1| 65| 40.70358305| -74.009206295|40.7636900022|-73.9719665051|\n",
"| 6220| 56| 2| 40.85011295|-73.9520859718|40.8514833821|-73.9468287333|\n",
"|10100| 1| 59|40.7190166905| -74.008749967|40.7805414319| -73.964475333|\n",
"| 4605| 2| 4|40.7208271761|-73.9988958836| 40.752604|-73.9944867037|\n",
"+-----+---+-----+-------------+--------------+-------------+--------------+\n",
"only showing top 10 rows\n",
"\n"
]
}
],
"source": [
"val bbox_df = bboxRdd.\n",
" flatMapValues(x => x).\n",
" map(x => (x._1, x._2._1, x._2._2,x._2._3,x._2._4,x._2._5,x._2._6)).\n",
" toDF(\"uid\", \"cid\", \"csize\", \"lat_min\", \"lon_min\", \"lat_max\", \"lon_max\").\n",
" filter($\"cid\" > 0)\n",
"\n",
"bbox_df.show(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pagerank\n",
"popularity\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Apache Toree - Scala",
"language": "scala",
"name": "apache_toree_scala"
},
"language_info": {
"name": "scala",
"version": "2.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment