Last active
June 2, 2016 20:55
-
-
Save natbusa/e24ebcc35d85382ede3f to your computer and use it in GitHub Desktop.
Clustering venues with Spark: The good, The bad, The ugly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// $SPARK_HOME/bin/spark-shell --master spark://localhost:7077 --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --conf spark.cassandra.connection.host=localhost | |
// let's do some data data science, | |
// Idea: | |
// | |
// venues exhibit a typical visit pattern during the week. | |
// Some venues are more checked in during the weekends, other during midweek. | |
// Let's apply machine learning to cluster venues which exhibit | |
// the same visiting behavior during the week. | |
// Get the data. | |
// Let's use a fancy datastore, and extract the data from cassandra | |
// CREATE TABLE checkins ( | |
// ts timestamp, | |
// uid bigint, | |
// vid bigint, | |
// lat double, | |
// lon double, | |
// PRIMARY KEY (ts, uid) | |
// ) WITH CLUSTERING ORDER BY (uid ASC); | |
import com.datastax.spark.connector._ | |
import com.datastax.spark.connector.cql._ | |
// bummer 1: | |
// cassandra-spark connector works with Long, java.util.Date, java.sql.Date, org.joda.time.DateTime | |
// but does not support java.sql.Timestamp which is the chosen timestamp for spark SQL | |
// first way: reflect case class, fix timestamp type by casting long to java.sql.Timestamp | |
case class Checkin(ts: Long, uid: Long, vid: Long) | |
val df = sc.cassandraTable[Checkin]("lbsn", "checkins"). | |
select("ts", "uid", "vid"). | |
toDF. | |
withColumn("ts", $"ts".cast("timestamp")) | |
df.show(5, false) | |
// +---------------------+-----+-------+ | |
// |ts |uid |vid | | |
// +---------------------+-----+-------+ | |
// |2010-06-01 02:11:30.0|15584|184218 | | |
// |2010-06-01 02:14:39.0|24672|1205388| | |
// |2010-06-01 02:16:21.0|34131|34251 | | |
// |2010-06-01 02:23:36.0|2798 |492100 | | |
// |2010-06-01 02:32:21.0|59263|1204811| | |
// +---------------------+-----+-------+ | |
// second way: use the Cassandra sql context | |
import org.apache.spark.sql.cassandra.CassandraSQLContext | |
val cc = new CassandraSQLContext(sc) | |
val df = cc.sql("select ts, uid, vid from lbsn.checkins") | |
// +---------------------+-----+-------+ | |
// |ts |uid |vid | | |
// +---------------------+-----+-------+ | |
// |2010-06-01 02:11:30.0|15584|184218 | | |
// |2010-06-01 02:14:39.0|24672|1205388| | |
// |2010-06-01 02:16:21.0|34131|34251 | | |
// |2010-06-01 02:23:36.0|2798 |492100 | | |
// |2010-06-01 02:32:21.0|59263|1204811| | |
// +---------------------+-----+-------+ | |
// ok. The data is in Spark now, let's start featurizing! | |
// add a column with the day of the week! | |
// bummer 2: no day of the week function in spark sql ... | |
// Need to cast from java.sql.Timestamp to joda | |
// another time types related casting ... | |
import org.joda.time.DateTime | |
import org.joda.time.DateTimeZone.UTC | |
import java.sql.Timestamp | |
import org.apache.spark.sql.functions.udf | |
val dayOfWeek = udf( (ts: Timestamp) => { | |
val dt = new DateTime(ts,UTC) | |
dt.getDayOfWeek() | |
}) | |
val df_dow = df.withColumn("dow", dayOfWeek($"ts")) | |
df_dow.show(5, false) | |
// +---------------------+-----+-------+---+ | |
// |ts |uid |vid |dow| | |
// +---------------------+-----+-------+---+ | |
// |2010-06-01 02:11:30.0|15584|184218 |2 | | |
// |2010-06-01 02:14:39.0|24672|1205388|2 | | |
// |2010-06-01 02:16:21.0|34131|34251 |2 | | |
// |2010-06-01 02:23:36.0|2798 |492100 |2 | | |
// |2010-06-01 02:32:21.0|59263|1204811|2 | | |
// +---------------------+-----+-------+---+ | |
// from scalar day-of-week to to weekly vector! | |
import breeze.linalg.DenseVector | |
val weekVector = udf( (i: Int) => { | |
DenseVector((1 to 7).map(x => if (x == i) 1.0 else 0.0).toArray) | |
}) | |
df_dow.select("ts", "dow", weekVector($"dow").as("weekly_hist")) | |
// bummer 3: breeze.linalg.DenseVector not supported | |
// java.lang.UnsupportedOperationException: Schema for type breeze.linalg.DenseVector[Double] is not supported | |
// at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) | |
// switch to using spark sql Vector, | |
// which has the same name but not the same implementation | |
import org.apache.spark.mllib.linalg.{Vector,Vectors} | |
val weekVector = udf( (i: Int) => { | |
Vectors.dense((1 to 7).map(x => if (x == i) 1.0 else 0.0).toArray) | |
}) | |
val df_weekly = df_dow.select($"vid", weekVector($"dow").as("weekly_hist")) | |
df_weekly.show(5, false) | |
// +-------+-----------------------------+ | |
// |vid |weekly_hist | | |
// +-------+-----------------------------+ | |
// |184218 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]| | |
// |1205388|[0.0,1.0,0.0,0.0,0.0,0.0,0.0]| | |
// |34251 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]| | |
// |492100 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]| | |
// |1204811|[0.0,1.0,0.0,0.0,0.0,0.0,0.0]| | |
// +-------+-----------------------------+ | |
// awesome! now let's aggregate and sum the vectors belonging to each vid | |
// monoid style! Add everything! , in this case vectors | |
//bummer 4: this does not work. No vector math for columnar expressions | |
df_weekly.groupBy("vid").sum("weekly_hist") | |
// org.apache.spark.sql.AnalysisException: "weekly_hist" is not a numeric column. Aggregation function can only be applied on a numeric column.; | |
// at org.apache.spark.sql.GroupedData$$anonfun$2.apply(GroupedData.scala:112) | |
// ok, no worries. Back to breeze vector, | |
// oh wait a second. Need to recast from org.apache.spark.mllib.linalg.DenseVector | |
// spark sql is not an option (see bummer 3), so back to good old rdd's | |
val d = df_weekly.map(row => (row.getLong(0), DenseVector(row.getAs[Vector]("weekly_hist").toArray))). | |
reduceByKey(_ + _).toDF("vid", "hist") | |
// java.lang.UnsupportedOperationException: Schema for type breeze.linalg.DenseVector[Double] is not supported | |
// at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) | |
// OK, back to spark DataFrame. No wait! | |
// bummer 5: schema not supported for breeze.linalg.DenseVector[Double] | |
// what was I thinking? recasting back to spark DenseVector | |
val d = df_weekly.map(row => (row.getLong(0), DenseVector(row.getAs[Vector]("weekly_hist").toArray))). | |
reduceByKey(_ + _). | |
mapValues(x => Vectors.dense(x.toArray)). | |
toDF("vid", "hist") | |
// +-------+-----------------------------+ | |
// |vid |hist | | |
// +-------+-----------------------------+ | |
// |475830 |[0.0,1.0,0.0,0.0,0.0,0.0,1.0]| | |
// |298530 |[0.0,0.0,1.0,1.0,0.0,0.0,0.0]| | |
// |3706430|[0.0,2.0,0.0,0.0,0.0,0.0,0.0]| | |
// |5770820|[1.0,0.0,0.0,0.0,0.0,0.0,0.0]| | |
// |891180 |[0.0,0.0,0.0,0.0,1.0,0.0,0.0]| | |
// +-------+-----------------------------+ | |
// Yeah this rocks! | |
// Moving on to feature transformation! | |
// Let's use spark ML and pipelines! | |
import org.apache.spark.ml.Pipeline | |
import org.apache.spark.ml.feature.{DCT, Normalizer} | |
val dct = new DCT().setInputCol("norm").setOutputCol("dct").setInverse(false) | |
val normalizer = new Normalizer().setInputCol("hist").setOutputCol("norm").setP(1.0) | |
val pipeline = new Pipeline().setStages(Array(normalizer, dct)) | |
// bit puzzled why should I call "fit" if I am just transforming columns? | |
// at this point this is the least of my problems ... | |
val model = pipeline.fit(d) | |
val dd = model.transform(d) | |
dd.show(5) | |
// +-------+--------------------+--------------------+--------------------+ | |
// | vid| hist| norm| dct| | |
// +-------+--------------------+--------------------+--------------------+ | |
// | 475830|[0.0,1.0,0.0,0.0,...|[0.0,0.5,0.0,0.0,...|[0.37796447300922...| | |
// | 298530|[0.0,0.0,1.0,1.0,...|[0.0,0.0,0.5,0.5,...|[0.37796447300922...| | |
// |3706430|[0.0,2.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|[0.37796447300922...| | |
// |5770820|[1.0,0.0,0.0,0.0,...|[1.0,0.0,0.0,0.0,...|[0.37796447300922...| | |
// | 891180|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|[0.37796447300922...| | |
// +-------+--------------------+--------------------+--------------------+ | |
// k-means here! | |
// bummer 6: k-means is not yet in ML, so there is no "fitting" to kmean clusters | |
// not an issue, back to MLlib, and get the job done! | |
import org.apache.spark.mllib.clustering.KMeans | |
val f = dd.select("dct").rdd.map(r => r.getAs[Vector]("dct")).cache() | |
val numClusters = 10 | |
val numIterations = 20 | |
val clusters = KMeans.train(f, numClusters, numIterations) | |
// show the clusters in all their glory :) | |
clusters.clusterCenters.take(4).foreach(println) | |
// [0.37796447300922825,-0.40672737445206353,0.36529770485070134,-0.33499600248264894,0.24038285478039526,-0.20109941979499654,0.11627968707978312] | |
// [0.3779644730092282,-0.40370501114499235,0.11167131384472952,0.22008696706620345,-0.46678202966385474,0.5070864326593763,-0.3246375761355341] | |
// [0.37796447300922825,-0.009577269778527991,-0.16964806001109617,0.009083101707591823,0.1752818101815428,-0.0016188503764671164,-0.16579029329876688] | |
// [0.3779644730092282,-0.22053024891116707,-0.3107474508302649,0.4915304998280451,-0.110178523608113,-0.3924890556900048,0.4511667408123527] | |
// yay we have got our model, and the cluster centers | |
// Now writing the scoring as a ML custom transformation, or as a UDF ... | |
// going for UDF | |
import org.apache.spark.sql.functions.udf | |
val kMeansScore = udf((v:Vector) => clusters.predict(v)) | |
// add predictions as extra column, by using a user define function | |
// remember that clusters closes over the udf, and is broadcasted to the various executors | |
val res = dd.withColumn("cluster", kMeansScore($"dct")) | |
// Done. | |
res.show(5) | |
// +-------+--------------------+--------------------+--------------------+-------+ | |
// | vid| hist| norm| dct|cluster| | |
// +-------+--------------------+--------------------+--------------------+-------+ | |
// | 475830|[0.0,1.0,0.0,0.0,...|[0.0,0.5,0.0,0.0,...|[0.37796447300922...| 0| | |
// | 298530|[0.0,0.0,1.0,1.0,...|[0.0,0.0,0.5,0.5,...|[0.37796447300922...| 2| | |
// |3706430|[0.0,2.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|[0.37796447300922...| 5| | |
// |5770820|[1.0,0.0,0.0,0.0,...|[1.0,0.0,0.0,0.0,...|[0.37796447300922...| 8| | |
// | 891180|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|[0.37796447300922...| 3| | |
// +-------+--------------------+--------------------+--------------------+-------+ | |
// Lessons learned. | |
// The good: | |
// we cleared the job, featurized, transformed and clustered, fit and scored | |
// runninig the alsorightm in a scalable fashion, not worring about parallelism | |
// spark-cassandra connector is great | |
// spark sql and DataFrame provide the right abstraction for data scientists | |
// ML and pipelines are a good intuition, expecially when it's played with Spark Streaming | |
// | |
// The bad: | |
// Spark SQL functions and support is still early days | |
// Same for ML vs MLlib, more functions and transformations needed | |
// Need more data science, more algorithm, better integration with Breeze | |
// | |
// The ugly | |
// SQL Types are quite hard and unforgiving, expecially when it comes to time | |
// SPARL MLlib vector implementation hides lots of the awesomeness of the breeze library | |
// Still quite some friction to deal with. You might be tempted to just use RDDs and | |
// build your data science from there only. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Totally agree. I'm discovering all these issues one by one.