Skip to content

Instantly share code, notes, and snippets.

@jlln
Last active February 24, 2016 03:34
Show Gist options
  • Save jlln/b4986b36c77ea6927e66 to your computer and use it in GitHub Desktop.
Save jlln/b4986b36c77ea6927e66 to your computer and use it in GitHub Desktop.
How to pivot a spark dataframe and cast the values into a vector
val cameo_maps = event_data_ag1.rdd
.groupBy(x=> (x.getAs[String]("Country"),x.getAs[Int]("ElapsedMonths")))
.map { case (group_features,codes) => group_features -> codes
.map {code => code.getAs[Int]("CAMEO Code") -> code.getAs[Long]("count") }
.toMap
}
val cameos = sc.broadcast(cameo_maps.map(_._2.keySet).reduce(_ union _).toArray.sorted)
val cameo_arrays = cameo_maps.map{
case ((country,total_months),cameo_map) => (country,total_months) -> cameos.value.map(cameo_map.getOrElse(_,0L))
}
val vectors =cameo_arrays.map{
x => (x._1,new DenseVector(x._2.map(x=> x.toDouble)).toSparse)
}
case class VectorEntry(country:String,total_months:Int,event_vector:org.apache.spark.mllib.linalg.SparseVector)
val vectors_rebuilt = vectors.map{
case ((country,total_months),vector) => VectorEntry(country,total_months,vector)
}
val vectors_df = vectors_rebuilt.toDF()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment