Skip to content

Instantly share code, notes, and snippets.

@bepcyc
Created September 1, 2015 07:35
Show Gist options
  • Select an option

  • Save bepcyc/27ccbc1fa4319ff183ed to your computer and use it in GitHub Desktop.

Select an option

Save bepcyc/27ccbc1fa4319ff183ed to your computer and use it in GitHub Desktop.
package com.avira.ds.sparser.spark
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.{SparkContext, SparkConf}
import scala.language.implicitConversions
sealed trait Event
case class ClickEvent(blaBla: String) extends Event
case class ViewEvent(blaBla: String) extends Event
class RDDMultipleOutputFormat extends MultipleTextOutputFormat[Any, Any] {
def mapEventAsPath(x: Any): String = x match {
case v if v.isInstanceOf[ClickEvent] => "click_event"
case v if v.isInstanceOf[ViewEvent] => "view_event"
case _ => "some_event"
}
override def generateActualKey(key: Any, value: Any): AnyRef = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
mapEventAsPath(name + "/" + value)
}
object Test {
def main (args: Array[String]){
val conf = new SparkConf()
.setAppName("test")
val sc = new SparkContext(conf)
val events: Seq[(String, Event)] = 1 to 100 map { _ =>
math.random match {
case x if x < 0.5 => ("key" + x, ClickEvent("I am click " + x))
case y => ("key" + y, ViewEvent("I am view " + y))
}
}
sc
.parallelize(events)
.saveAsHadoopFile("data_output",
classOf[String], classOf[Event], classOf[RDDMultipleOutputFormat])
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment