Created
May 4, 2016 03:34
-
-
Save ocadaruma/f80413e2a25e3df49e835c3cffc75398 to your computer and use it in GitHub Desktop.
Spark - Multiple output for each keys
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.io.{Text, NullWritable} | |
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat | |
import org.apache.spark.rdd.{PairRDDFunctions, RDD} | |
import org.apache.spark.{SparkConf, SparkContext} | |
case class PageView(accountId: Long, cookieId: String, timestamp: Long) | |
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { | |
override def generateActualKey(key: Any, value: Any): Any = | |
NullWritable.get() | |
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = | |
new Path(key.toString, name).toString | |
} | |
object Main { | |
def main(args: Array[String]): Unit = { | |
val sparkConf = new SparkConf().setAppName("spark-example") | |
val sc = new SparkContext(sparkConf) | |
val pageViewRDD: RDD[PageView] = ??? | |
pageViewRDD | |
.map(pv => ((pv.accountId, pv.cookieId), pv)) | |
.groupByKey() | |
.flatMap { case ((accountId, cookieId), pvs) => | |
// pvsを使って何か集計 | |
// ・・・ | |
val result: List[PageView] = ??? | |
result.map(pv => (accountId, pv)) | |
} | |
.saveAsTextFileByKey("s3a://path/to/output") | |
sc.stop() | |
} | |
// PairRDDに便利関数足す | |
implicit class RichRDD[T](val self: RDD[T]) extends AnyVal { | |
def saveAsTextFileByKey[K, V](path: String)(implicit ev: RDD[T] => PairRDDFunctions[K, V]): Unit = | |
self.saveAsHadoopFile(path, classOf[NullWritable], classOf[Text], classOf[RDDMultipleTextOutputFormat]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment