Skip to content

Instantly share code, notes, and snippets.

@natbusa
Last active June 2, 2016 20:55
Show Gist options
  • Save natbusa/e24ebcc35d85382ede3f to your computer and use it in GitHub Desktop.
Save natbusa/e24ebcc35d85382ede3f to your computer and use it in GitHub Desktop.
Clustering venues with Spark: The good, The bad, The ugly
// $SPARK_HOME/bin/spark-shell --master spark://localhost:7077 --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --conf spark.cassandra.connection.host=localhost
// let's do some data data science,
// Idea:
//
// venues exhibit a typical visit pattern during the week.
// Some venues are more checked in during the weekends, other during midweek.
// Let's apply machine learning to cluster venues which exhibit
// the same visiting behavior during the week.
// Get the data.
// Let's use a fancy datastore, and extract the data from cassandra
// CREATE TABLE checkins (
// ts timestamp,
// uid bigint,
// vid bigint,
// lat double,
// lon double,
// PRIMARY KEY (ts, uid)
// ) WITH CLUSTERING ORDER BY (uid ASC);
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
// bummer 1:
// cassandra-spark connector works with Long, java.util.Date, java.sql.Date, org.joda.time.DateTime
// but does not support java.sql.Timestamp which is the chosen timestamp for spark SQL
// first way: reflect case class, fix timestamp type by casting long to java.sql.Timestamp
case class Checkin(ts: Long, uid: Long, vid: Long)
val df = sc.cassandraTable[Checkin]("lbsn", "checkins").
select("ts", "uid", "vid").
toDF.
withColumn("ts", $"ts".cast("timestamp"))
df.show(5, false)
// +---------------------+-----+-------+
// |ts |uid |vid |
// +---------------------+-----+-------+
// |2010-06-01 02:11:30.0|15584|184218 |
// |2010-06-01 02:14:39.0|24672|1205388|
// |2010-06-01 02:16:21.0|34131|34251 |
// |2010-06-01 02:23:36.0|2798 |492100 |
// |2010-06-01 02:32:21.0|59263|1204811|
// +---------------------+-----+-------+
// second way: use the Cassandra sql context
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
val df = cc.sql("select ts, uid, vid from lbsn.checkins")
// +---------------------+-----+-------+
// |ts |uid |vid |
// +---------------------+-----+-------+
// |2010-06-01 02:11:30.0|15584|184218 |
// |2010-06-01 02:14:39.0|24672|1205388|
// |2010-06-01 02:16:21.0|34131|34251 |
// |2010-06-01 02:23:36.0|2798 |492100 |
// |2010-06-01 02:32:21.0|59263|1204811|
// +---------------------+-----+-------+
// ok. The data is in Spark now, let's start featurizing!
// add a column with the day of the week!
// bummer 2: no day of the week function in spark sql ...
// Need to cast from java.sql.Timestamp to joda
// another time types related casting ...
import org.joda.time.DateTime
import org.joda.time.DateTimeZone.UTC
import java.sql.Timestamp
import org.apache.spark.sql.functions.udf
val dayOfWeek = udf( (ts: Timestamp) => {
val dt = new DateTime(ts,UTC)
dt.getDayOfWeek()
})
val df_dow = df.withColumn("dow", dayOfWeek($"ts"))
df_dow.show(5, false)
// +---------------------+-----+-------+---+
// |ts |uid |vid |dow|
// +---------------------+-----+-------+---+
// |2010-06-01 02:11:30.0|15584|184218 |2 |
// |2010-06-01 02:14:39.0|24672|1205388|2 |
// |2010-06-01 02:16:21.0|34131|34251 |2 |
// |2010-06-01 02:23:36.0|2798 |492100 |2 |
// |2010-06-01 02:32:21.0|59263|1204811|2 |
// +---------------------+-----+-------+---+
// from scalar day-of-week to to weekly vector!
import breeze.linalg.DenseVector
val weekVector = udf( (i: Int) => {
DenseVector((1 to 7).map(x => if (x == i) 1.0 else 0.0).toArray)
})
df_dow.select("ts", "dow", weekVector($"dow").as("weekly_hist"))
// bummer 3: breeze.linalg.DenseVector not supported
// java.lang.UnsupportedOperationException: Schema for type breeze.linalg.DenseVector[Double] is not supported
// at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
// switch to using spark sql Vector,
// which has the same name but not the same implementation
import org.apache.spark.mllib.linalg.{Vector,Vectors}
val weekVector = udf( (i: Int) => {
Vectors.dense((1 to 7).map(x => if (x == i) 1.0 else 0.0).toArray)
})
val df_weekly = df_dow.select($"vid", weekVector($"dow").as("weekly_hist"))
df_weekly.show(5, false)
// +-------+-----------------------------+
// |vid |weekly_hist |
// +-------+-----------------------------+
// |184218 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]|
// |1205388|[0.0,1.0,0.0,0.0,0.0,0.0,0.0]|
// |34251 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]|
// |492100 |[0.0,1.0,0.0,0.0,0.0,0.0,0.0]|
// |1204811|[0.0,1.0,0.0,0.0,0.0,0.0,0.0]|
// +-------+-----------------------------+
// awesome! now let's aggregate and sum the vectors belonging to each vid
// monoid style! Add everything! , in this case vectors
//bummer 4: this does not work. No vector math for columnar expressions
df_weekly.groupBy("vid").sum("weekly_hist")
// org.apache.spark.sql.AnalysisException: "weekly_hist" is not a numeric column. Aggregation function can only be applied on a numeric column.;
// at org.apache.spark.sql.GroupedData$$anonfun$2.apply(GroupedData.scala:112)
// ok, no worries. Back to breeze vector,
// oh wait a second. Need to recast from org.apache.spark.mllib.linalg.DenseVector
// spark sql is not an option (see bummer 3), so back to good old rdd's
val d = df_weekly.map(row => (row.getLong(0), DenseVector(row.getAs[Vector]("weekly_hist").toArray))).
reduceByKey(_ + _).toDF("vid", "hist")
// java.lang.UnsupportedOperationException: Schema for type breeze.linalg.DenseVector[Double] is not supported
// at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
// OK, back to spark DataFrame. No wait!
// bummer 5: schema not supported for breeze.linalg.DenseVector[Double]
// what was I thinking? recasting back to spark DenseVector
val d = df_weekly.map(row => (row.getLong(0), DenseVector(row.getAs[Vector]("weekly_hist").toArray))).
reduceByKey(_ + _).
mapValues(x => Vectors.dense(x.toArray)).
toDF("vid", "hist")
// +-------+-----------------------------+
// |vid |hist |
// +-------+-----------------------------+
// |475830 |[0.0,1.0,0.0,0.0,0.0,0.0,1.0]|
// |298530 |[0.0,0.0,1.0,1.0,0.0,0.0,0.0]|
// |3706430|[0.0,2.0,0.0,0.0,0.0,0.0,0.0]|
// |5770820|[1.0,0.0,0.0,0.0,0.0,0.0,0.0]|
// |891180 |[0.0,0.0,0.0,0.0,1.0,0.0,0.0]|
// +-------+-----------------------------+
// Yeah this rocks!
// Moving on to feature transformation!
// Let's use spark ML and pipelines!
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{DCT, Normalizer}
val dct = new DCT().setInputCol("norm").setOutputCol("dct").setInverse(false)
val normalizer = new Normalizer().setInputCol("hist").setOutputCol("norm").setP(1.0)
val pipeline = new Pipeline().setStages(Array(normalizer, dct))
// bit puzzled why should I call "fit" if I am just transforming columns?
// at this point this is the least of my problems ...
val model = pipeline.fit(d)
val dd = model.transform(d)
dd.show(5)
// +-------+--------------------+--------------------+--------------------+
// | vid| hist| norm| dct|
// +-------+--------------------+--------------------+--------------------+
// | 475830|[0.0,1.0,0.0,0.0,...|[0.0,0.5,0.0,0.0,...|[0.37796447300922...|
// | 298530|[0.0,0.0,1.0,1.0,...|[0.0,0.0,0.5,0.5,...|[0.37796447300922...|
// |3706430|[0.0,2.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|[0.37796447300922...|
// |5770820|[1.0,0.0,0.0,0.0,...|[1.0,0.0,0.0,0.0,...|[0.37796447300922...|
// | 891180|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|[0.37796447300922...|
// +-------+--------------------+--------------------+--------------------+
// k-means here!
// bummer 6: k-means is not yet in ML, so there is no "fitting" to kmean clusters
// not an issue, back to MLlib, and get the job done!
import org.apache.spark.mllib.clustering.KMeans
val f = dd.select("dct").rdd.map(r => r.getAs[Vector]("dct")).cache()
val numClusters = 10
val numIterations = 20
val clusters = KMeans.train(f, numClusters, numIterations)
// show the clusters in all their glory :)
clusters.clusterCenters.take(4).foreach(println)
// [0.37796447300922825,-0.40672737445206353,0.36529770485070134,-0.33499600248264894,0.24038285478039526,-0.20109941979499654,0.11627968707978312]
// [0.3779644730092282,-0.40370501114499235,0.11167131384472952,0.22008696706620345,-0.46678202966385474,0.5070864326593763,-0.3246375761355341]
// [0.37796447300922825,-0.009577269778527991,-0.16964806001109617,0.009083101707591823,0.1752818101815428,-0.0016188503764671164,-0.16579029329876688]
// [0.3779644730092282,-0.22053024891116707,-0.3107474508302649,0.4915304998280451,-0.110178523608113,-0.3924890556900048,0.4511667408123527]
// yay we have got our model, and the cluster centers
// Now writing the scoring as a ML custom transformation, or as a UDF ...
// going for UDF
import org.apache.spark.sql.functions.udf
val kMeansScore = udf((v:Vector) => clusters.predict(v))
// add predictions as extra column, by using a user define function
// remember that clusters closes over the udf, and is broadcasted to the various executors
val res = dd.withColumn("cluster", kMeansScore($"dct"))
// Done.
res.show(5)
// +-------+--------------------+--------------------+--------------------+-------+
// | vid| hist| norm| dct|cluster|
// +-------+--------------------+--------------------+--------------------+-------+
// | 475830|[0.0,1.0,0.0,0.0,...|[0.0,0.5,0.0,0.0,...|[0.37796447300922...| 0|
// | 298530|[0.0,0.0,1.0,1.0,...|[0.0,0.0,0.5,0.5,...|[0.37796447300922...| 2|
// |3706430|[0.0,2.0,0.0,0.0,...|[0.0,1.0,0.0,0.0,...|[0.37796447300922...| 5|
// |5770820|[1.0,0.0,0.0,0.0,...|[1.0,0.0,0.0,0.0,...|[0.37796447300922...| 8|
// | 891180|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|[0.37796447300922...| 3|
// +-------+--------------------+--------------------+--------------------+-------+
// Lessons learned.
// The good:
// we cleared the job, featurized, transformed and clustered, fit and scored
// runninig the alsorightm in a scalable fashion, not worring about parallelism
// spark-cassandra connector is great
// spark sql and DataFrame provide the right abstraction for data scientists
// ML and pipelines are a good intuition, expecially when it's played with Spark Streaming
//
// The bad:
// Spark SQL functions and support is still early days
// Same for ML vs MLlib, more functions and transformations needed
// Need more data science, more algorithm, better integration with Breeze
//
// The ugly
// SQL Types are quite hard and unforgiving, expecially when it comes to time
// SPARL MLlib vector implementation hides lots of the awesomeness of the breeze library
// Still quite some friction to deal with. You might be tempted to just use RDDs and
// build your data science from there only.
@michaelrkytch
Copy link

Totally agree. I'm discovering all these issues one by one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment