Skip to content

Instantly share code, notes, and snippets.

@mlehman
Last active April 11, 2022 06:54
Show Gist options
  • Save mlehman/df9546f6be2e362bbad2 to your computer and use it in GitHub Desktop.
Save mlehman/df9546f6be2e362bbad2 to your computer and use it in GitHub Desktop.
Hadoop MultipleOutputs on Spark Example
/* Example using MultipleOutputs to write a Spark RDD to multiples files.
Based on saveAsNewAPIHadoopFile implemented in org.apache.spark.rdd.PairRDDFunctions, org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil.
val values = sc.parallelize(List(
("fruit/items", "apple"),
("vegetable/items", "broccoli"),
("fruit/items", "pear"),
("fruit/items", "peach"),
("vegetable/items", "celery"),
("vegetable/items", "spinach")
))
values.saveAsMultiTextFiles("tmp/food")
OUTPUTS:
tmp/food/fruit/items-r-00000
apple
pear
peach
tmp/food/vegetable/items-r-00000
broccoli
celery
spinach
*/
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.io.{DataInputBuffer, NullWritable, Text}
import org.apache.hadoop.mapred.RawKeyValueIterator
import org.apache.hadoop.mapreduce.counters.GenericCounter
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat, LazyOutputFormat, MultipleOutputs}
import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
import org.apache.hadoop.util.Progress
import org.apache.spark._
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.hadoop.mapreduce._
import scala.reflect.ClassTag
object MultipleOutputsExample extends App with Logging {
import MultiOutputRDD._
val sc = new SparkContext("local", "MulitOutput Example")
val values = sc.parallelize(List(
("fruit/items", "apple"),
("vegetable/items", "broccoli"),
("fruit/items", "pear"),
("fruit/items", "peach"),
("vegetable/items", "celery"),
("vegetable/items", "spinach")
))
values.saveAsMultiTextFiles("tmp/food")
sc.stop()
}
class MultiOutputRDD[K, V](self: RDD[(String, (K, V))])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging with Serializable {
def saveAsMultiTextFiles(path: String) {
new MultiOutputRDD(self.map(x => (x._1, (NullWritable.get, new Text(x._2._2.toString)))))
.saveAsNewHadoopMultiOutputs[TextOutputFormat[NullWritable, Text]](path)
}
def saveAsNewHadoopMultiOutputs[F <: OutputFormat[K, V]](path: String, conf: Configuration = self.context.hadoopConfiguration)(implicit fm: ClassTag[F]) {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new Job(hadoopConf)
job.setOutputKeyClass(kt.runtimeClass)
job.setOutputValueClass(vt.runtimeClass)
LazyOutputFormat.setOutputFormatClass(job, fm.runtimeClass.asInstanceOf[Class[F]])
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDatasetMultiOutputs(job.getConfiguration)
}
def saveAsNewAPIHadoopDatasetMultiOutputs(conf: Configuration) {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new Job(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V))]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, attemptNumber)
val hadoopContext = new TaskAttemptContextImpl(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new DummyIterator(itr), new GenericCounter, new GenericCounter,
recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass)
val writer = new MultipleOutputs(taskInputOutputContext)
try {
while (itr.hasNext) {
val pair = itr.next()
writer.write(pair._2._1, pair._2._2, pair._1)
}
} finally {
writer.close()
}
committer.commitTask(hadoopContext)
1
}: Int
val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}
class DummyIterator(itr: Iterator[_]) extends RawKeyValueIterator {
def getKey: DataInputBuffer = null
def getValue: DataInputBuffer = null
def getProgress: Progress = null
def next = itr.hasNext
def close() { }
}
}
object MultiOutputRDD {
implicit def rddToMultiOutputRDD[V](rdd: RDD[(String, V)])(implicit vt: ClassTag[V]) = {
new MultiOutputRDD(rdd.map(x => (x._1, (null, x._2))))
}
}
@vballaji
Copy link

vballaji commented Oct 6, 2014

Hello, I tried applying this code snippet to write the data into multiple location from one RDD. But the process taking more time (2 hours for this step alone), is there way where we can make it parallel. I feel this works sequentially. Note : I am working with 300 gigs volume of data. Any pointers, appreciate your help.

@pkallos
Copy link

pkallos commented Jan 27, 2015

👍

@xiaomozhang
Copy link

👍

@hn5092
Copy link

hn5092 commented Feb 26, 2016

thx

@ayoub-benali
Copy link

Thanks,
Just a note regarding spark 2.4.4, you have to use context.taskAttemptId instead of context.attemptId

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment