Last active
April 11, 2022 06:54
-
-
Save mlehman/df9546f6be2e362bbad2 to your computer and use it in GitHub Desktop.
Hadoop MultipleOutputs on Spark Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Example using MultipleOutputs to write a Spark RDD to multiples files. | |
Based on saveAsNewAPIHadoopFile implemented in org.apache.spark.rdd.PairRDDFunctions, org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil. | |
val values = sc.parallelize(List( | |
("fruit/items", "apple"), | |
("vegetable/items", "broccoli"), | |
("fruit/items", "pear"), | |
("fruit/items", "peach"), | |
("vegetable/items", "celery"), | |
("vegetable/items", "spinach") | |
)) | |
values.saveAsMultiTextFiles("tmp/food") | |
OUTPUTS: | |
tmp/food/fruit/items-r-00000 | |
apple | |
pear | |
peach | |
tmp/food/vegetable/items-r-00000 | |
broccoli | |
celery | |
spinach | |
*/ | |
import java.text.SimpleDateFormat | |
import java.util.Date | |
import org.apache.hadoop.io.{DataInputBuffer, NullWritable, Text} | |
import org.apache.hadoop.mapred.RawKeyValueIterator | |
import org.apache.hadoop.mapreduce.counters.GenericCounter | |
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat, LazyOutputFormat, MultipleOutputs} | |
import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl} | |
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter | |
import org.apache.hadoop.util.Progress | |
import org.apache.spark._ | |
import org.apache.hadoop.conf.{Configurable, Configuration} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.Logging | |
import org.apache.hadoop.mapreduce._ | |
import scala.reflect.ClassTag | |
object MultipleOutputsExample extends App with Logging { | |
import MultiOutputRDD._ | |
val sc = new SparkContext("local", "MulitOutput Example") | |
val values = sc.parallelize(List( | |
("fruit/items", "apple"), | |
("vegetable/items", "broccoli"), | |
("fruit/items", "pear"), | |
("fruit/items", "peach"), | |
("vegetable/items", "celery"), | |
("vegetable/items", "spinach") | |
)) | |
values.saveAsMultiTextFiles("tmp/food") | |
sc.stop() | |
} | |
class MultiOutputRDD[K, V](self: RDD[(String, (K, V))]) | |
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) | |
extends Logging with Serializable { | |
def saveAsMultiTextFiles(path: String) { | |
new MultiOutputRDD(self.map(x => (x._1, (NullWritable.get, new Text(x._2._2.toString))))) | |
.saveAsNewHadoopMultiOutputs[TextOutputFormat[NullWritable, Text]](path) | |
} | |
def saveAsNewHadoopMultiOutputs[F <: OutputFormat[K, V]](path: String, conf: Configuration = self.context.hadoopConfiguration)(implicit fm: ClassTag[F]) { | |
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). | |
val hadoopConf = conf | |
val job = new Job(hadoopConf) | |
job.setOutputKeyClass(kt.runtimeClass) | |
job.setOutputValueClass(vt.runtimeClass) | |
LazyOutputFormat.setOutputFormatClass(job, fm.runtimeClass.asInstanceOf[Class[F]]) | |
job.getConfiguration.set("mapred.output.dir", path) | |
saveAsNewAPIHadoopDatasetMultiOutputs(job.getConfiguration) | |
} | |
def saveAsNewAPIHadoopDatasetMultiOutputs(conf: Configuration) { | |
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). | |
val hadoopConf = conf | |
val job = new Job(hadoopConf) | |
val formatter = new SimpleDateFormat("yyyyMMddHHmm") | |
val jobtrackerID = formatter.format(new Date()) | |
val stageId = self.id | |
val wrappedConf = new SerializableWritable(job.getConfiguration) | |
val outfmt = job.getOutputFormatClass | |
val jobFormat = outfmt.newInstance | |
if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { | |
// FileOutputFormat ignores the filesystem parameter | |
jobFormat.checkOutputSpecs(job) | |
} | |
val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V))]) => { | |
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it | |
// around by taking a mod. We expect that no task will be attempted 2 billion times. | |
val attemptNumber = (context.attemptId % Int.MaxValue).toInt | |
/* "reduce task" <split #> <attempt # = spark task #> */ | |
val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, attemptNumber) | |
val hadoopContext = new TaskAttemptContextImpl(wrappedConf.value, attemptId) | |
val format = outfmt.newInstance | |
format match { | |
case c: Configurable => c.setConf(wrappedConf.value) | |
case _ => () | |
} | |
val committer = format.getOutputCommitter(hadoopContext) | |
committer.setupTask(hadoopContext) | |
val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]] | |
val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new DummyIterator(itr), new GenericCounter, new GenericCounter, | |
recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass) | |
val writer = new MultipleOutputs(taskInputOutputContext) | |
try { | |
while (itr.hasNext) { | |
val pair = itr.next() | |
writer.write(pair._2._1, pair._2._2, pair._1) | |
} | |
} finally { | |
writer.close() | |
} | |
committer.commitTask(hadoopContext) | |
1 | |
}: Int | |
val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0) | |
val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId) | |
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) | |
jobCommitter.setupJob(jobTaskContext) | |
self.context.runJob(self, writeShard) | |
jobCommitter.commitJob(jobTaskContext) | |
} | |
class DummyIterator(itr: Iterator[_]) extends RawKeyValueIterator { | |
def getKey: DataInputBuffer = null | |
def getValue: DataInputBuffer = null | |
def getProgress: Progress = null | |
def next = itr.hasNext | |
def close() { } | |
} | |
} | |
object MultiOutputRDD { | |
implicit def rddToMultiOutputRDD[V](rdd: RDD[(String, V)])(implicit vt: ClassTag[V]) = { | |
new MultiOutputRDD(rdd.map(x => (x._1, (null, x._2)))) | |
} | |
} |
thx
Thanks,
Just a note regarding spark 2.4.4, you have to use context.taskAttemptId
instead of context.attemptId
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
👍