Created
October 10, 2015 04:38
-
-
Save InvisibleTech/cc6eac19d7871b71ec27 to your computer and use it in GitHub Desktop.
An example of pivoting a Spark RDD based on an answer to a question asking how to do this which is posted on Stack Overflow. The answer was written in Python, so I basically translated it to Scala.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This Pivot sample is based on the 5th answer given on: | |
http://stackoverflows.xyz/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes | |
The answer above was written in Python, which I don't know very well. In addition, my Spark-Fu | |
is still somewhat introductory in some areas. To help with other aspects of translating the Python | |
sample I used these references: | |
http://codingjunkie.net/spark-agr-by-key/ | |
http://scala4fun.tumblr.com/post/84792374567/mergemaps | |
http://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered | |
*/ | |
import scala.collection._ | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.types.{StructType,StructField,StringType, IntegerType}; | |
val rdd = sc .parallelize(List(("X01",41,"US",3), | |
("X01",41,"UK",1), | |
("X01",41,"CA",2), | |
("X02",72,"US",4), | |
("X02",72,"UK",6), | |
("X02",72,"CA",7), | |
("X02",72,"XX",8))) | |
val krows = rdd.map(t => Row.fromSeq(t.productIterator.toList)).keyBy(_(0).toString) | |
def seqPivot(m: mutable.Map[String, Any], r: Row): mutable.Map[String, Any] = { | |
m += (r(2).toString -> r(3)) | |
m += ("Age" -> r(1)) | |
m | |
} | |
def cmbPivot(m1:mutable.Map[String, Any], m2:mutable.Map[String, Any]): mutable.Map[String, Any] = { | |
m1 ++= m2 | |
m1 | |
} | |
val pivoted = krows.aggregateByKey(mutable.Map.empty[String, Any])(seqPivot, cmbPivot) | |
val orderedColnames = pivoted.values.map(v => v.keys.toSet).reduce((s, t) => s.union(t)).toSeq.sortWith(_ < _) | |
val schema = StructType(List(StructField("ID", StringType, true)) ++ (for (c <- orderedColnames) yield StructField(c, IntegerType, true))) | |
val keyedRows = pivoted.map(t => List(t._1) ++ (for (c <- orderedColnames) yield t._2.getOrElse(c, null))).map(row => Row.fromSeq(row)) | |
val result = sqlContext.createDataFrame(keyedRows, schema) | |
result.show |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment