Skip to content

Instantly share code, notes, and snippets.

@ocadaruma
Created May 4, 2016 03:34
Show Gist options
  • Save ocadaruma/f80413e2a25e3df49e835c3cffc75398 to your computer and use it in GitHub Desktop.
Save ocadaruma/f80413e2a25e3df49e835c3cffc75398 to your computer and use it in GitHub Desktop.
Spark - Multiple output for each keys
package com.example
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, NullWritable}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.{SparkConf, SparkContext}
case class PageView(accountId: Long, cookieId: String, timestamp: Long)
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
new Path(key.toString, name).toString
}
object Main {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("spark-example")
val sc = new SparkContext(sparkConf)
val pageViewRDD: RDD[PageView] = ???
pageViewRDD
.map(pv => ((pv.accountId, pv.cookieId), pv))
.groupByKey()
.flatMap { case ((accountId, cookieId), pvs) =>
// pvsを使って何か集計
// ・・・
val result: List[PageView] = ???
result.map(pv => (accountId, pv))
}
.saveAsTextFileByKey("s3a://path/to/output")
sc.stop()
}
// PairRDDに便利関数足す
implicit class RichRDD[T](val self: RDD[T]) extends AnyVal {
def saveAsTextFileByKey[K, V](path: String)(implicit ev: RDD[T] => PairRDDFunctions[K, V]): Unit =
self.saveAsHadoopFile(path, classOf[NullWritable], classOf[Text], classOf[RDDMultipleTextOutputFormat])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment