Skip to content

Instantly share code, notes, and snippets.

@silasdavis
Last active January 18, 2022 07:07
Show Gist options
  • Save silasdavis/d1d1f1f7ab78249af462 to your computer and use it in GitHub Desktop.
Save silasdavis/d1d1f1f7ab78249af462 to your computer and use it in GitHub Desktop.
Wrapping OutputFormat to produce multiple outputs with hadoop MultipleOutputs
/**
* This file contains the core idea of wrapping an underlying OutputFormat with an OutputFormat
* with an augmented key that writes to partitions using MultipleOutputs (or something similar)
*/
package model.hadoop
import model.hadoop.HadoopIO.MultipleOutputer
import model.hadoop.HadoopIO.MultipleOutputer._
import org.apache.hadoop.io.{DataInputBuffer, NullWritable}
import org.apache.hadoop.mapred.RawKeyValueIterator
import org.apache.hadoop.mapreduce.counters.GenericCounter
import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs}
import org.apache.hadoop.mapreduce.task.ReduceContextImpl
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
import org.apache.hadoop.mapreduce.{Job, _}
import org.apache.hadoop.util.Progress
object MultipleOutputsFormat {
// Type inference fails with this inlined in constructor parameters
private def defaultMultipleOutputsMaker[K, V](io: TaskInputOutputContext[_, _, K, V]): MultipleOutputer[K, V] =
new MultipleOutputs[K, V](io)
}
/**
* Subclass this to create a multiple output creating OutputFormat. The subclass must have a nullary constructor so
* hadoop can construct it with `.newInstance`, ugh...
*
* The output format expects a two-part key of the form (outputPath, actualKey), the string outputPath will be used to
* partition the output into different directories ('/' separated filenames).
*
* For some reason MultipleOutputs does not work with Avro, but the near-identical AvroMultipleOutputs does. Irritatingly
* these obviously related classes have no common ancestor so they are combined under the MultipleOutputer type class
* which at least allows for future extension.
*
* @param outputFormat the underlying OutputFormat responsible for writing to varies
* @param multipleOutputsMaker factory method for constructing an object implementing the MultiplerOutputer trait
* @tparam K key type of the underlying OutputFormat
* @tparam V value type of the underlying OutputFormat
*/
abstract class MultipleOutputsFormat[K, V]
(outputFormat: OutputFormat[K, V],
multipleOutputsMaker: TaskInputOutputContext[_, _, K, V] => MultipleOutputer[K, V] =
(r: TaskInputOutputContext[_, _, K, V]) => MultipleOutputsFormat.defaultMultipleOutputsMaker[K, V](r))
extends OutputFormat[(String, K), V] {
override def checkOutputSpecs(context: JobContext): Unit = outputFormat.checkOutputSpecs(context)
override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = outputFormat
.getOutputCommitter(context)
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[(String, K), V] =
new RecordWriter[(String, K), V] {
val job = Job.getInstance(context.getConfiguration)
// Set underlying output format using lazy output
LazyOutputFormat.setOutputFormatClass(job, outputFormat.getClass)
// We pass a ReduceContext with most fields dummied-out since they will not be used in the context
// of Spark's saveAs*Hadoop* methods
val ioContext = new ReduceContextImpl(job.getConfiguration, context.getTaskAttemptID,
new DummyIterator, new GenericCounter, new GenericCounter,
new DummyRecordWriter, new DummyOutputCommitter, new DummyReporter, null,
classOf[NullWritable], classOf[NullWritable])
val multipleOutputs: MultipleOutputer[K, V] = multipleOutputsMaker(ioContext)
override def write(keys: (String, K), value: V): Unit = {
keys match {
case (path, key) =>
multipleOutputs.write(key, value, path)
}
}
override def close(context: TaskAttemptContext): Unit = multipleOutputs.close()
}
private class DummyOutputCommitter extends OutputCommitter {
override def setupJob(jobContext: JobContext): Unit = ()
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = false
override def setupTask(taskContext: TaskAttemptContext): Unit = ()
override def commitTask(taskContext: TaskAttemptContext): Unit = ()
override def abortTask(taskContext: TaskAttemptContext): Unit = ()
}
private class DummyRecordWriter extends RecordWriter[K, V] {
override def write(key: K, value: V): Unit = ()
override def close(context: TaskAttemptContext): Unit = ()
}
private class DummyIterator extends RawKeyValueIterator {
override def getKey: DataInputBuffer = null
override def getValue: DataInputBuffer = null
override def getProgress: Progress = null
override def close(): Unit = ()
override def next: Boolean = true
}
}
/**
* Code below gives some more context to the MultipleOutputsFormat class
*/
// This type class wraps MultipleOutputs-style writers
package model.hadoop
import org.apache.avro.mapreduce.AvroMultipleOutputs
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
object HadoopIO {
/**
* This is a work-around for the fact that Avro has it's own non-type-compatible version of MultipleOutputs
* it also allows for extending the functionality of MultipleOutputsFormat for other multiple output writers by
* providing this missing interface as a trait.
*/
trait MultipleOutputer[K, V] {
def write(key: K, value: V, path: String): Unit
def close(): Unit
}
object MultipleOutputer {
implicit class AvroMultipleOutputer[K, V](mo: AvroMultipleOutputs) extends MultipleOutputer[K, V] {
def write(key: K, value: V, path: String): Unit = mo.write(key, value, path)
def close(): Unit = mo.close()
}
implicit class PlainMultipleOutputer[K, V](mo: MultipleOutputs[K, V]) extends MultipleOutputer[K, V] {
def write(key: K, value: V, path: String): Unit = mo.write(key, value, path)
def close(): Unit = mo.close()
}
}
}
// SPECIFIC MULTIPLE OUTPUT FORMAT CLASSES
// For Parquet
package model.hadoop
import org.apache.avro.generic.GenericContainer
import parquet.hadoop.ParquetOutputFormat
class MultipleParquetOutputsFormat[T <: GenericContainer]
extends MultipleOutputsFormat (new ParquetOutputFormat[T]) {
}
// For Avro
package model.hadoop
import model.hadoop.HadoopIO.MultipleOutputer
import org.apache.avro.generic.GenericContainer
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroKeyOutputFormat, AvroMultipleOutputs}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.TaskInputOutputContext
object MultipleAvroOutputsFormat {
// This seems to be an unfortunate limitation of type inference of lambda defaults within constructor params.
// If it would work I would just inline this function
def amoMaker[T](io: TaskInputOutputContext[_, _, AvroKey[T], NullWritable]):
MultipleOutputer[AvroKey[T], NullWritable] = new AvroMultipleOutputs(io)
}
class MultipleAvroOutputsFormat[T <: GenericContainer]
extends MultipleOutputsFormat(new AvroKeyOutputFormat[T],
(io: TaskInputOutputContext[_, _, AvroKey[T], NullWritable]) => MultipleAvroOutputsFormat.amoMaker(io)) {
}
// For Text
package model.hadoop
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class MultipleTextOutputsFormat extends MultipleOutputsFormat(new TextOutputFormat[NullWritable, Text]) {
}
// END OF SPECIFIC MULTIPLE OUTPUTS FORMATS
// These extensions show how to read and write Avro RDDs and use MultipleOutputsFormat
package model.spark.extensions
import model.hadoop.{MultipleAvroOutputsFormat, MultipleParquetOutputsFormat, MultipleTextOutputsFormat}
import model.util.TextLike
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat}
import org.apache.avro.specific.SpecificData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport}
import parquet.hadoop.ParquetOutputFormat
import scala.reflect.{ClassTag, _}
object SerializationExtensions {
val logger = Logger.getLogger(getClass)
def avroParquetJob[T: ClassTag](job: Job = Job.getInstance(new Configuration())): Job = {
val schema: Schema = SpecificData.get.getSchema(classTag[T].runtimeClass)
AvroJob.setInputKeySchema(job, schema)
AvroJob.setOutputKeySchema(job, schema)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
ParquetOutputFormat.setEnableDictionary(job, false)
AvroParquetOutputFormat.setSchema(job, schema)
job
}
def definedOrLog(record: GenericRecord, field: String): Boolean = {
if (record.get(field) != null) return true
logger.warn(s"Expected field '$field' to be defined, but it was not on record of type '${record.getClass}'")
false
}
implicit class TextKeyRDD[K, V: TextLike](rdd: RDD[(K, V)]) {
def saveAsMultipleTextFiles(outputKeyFunction: (K) => String, outputPath: String): Unit = {
rdd.map{case (k,v) => ((outputKeyFunction(k), NullWritable.get),v)}.map {
case (k, v: String) => (k, new Text(v))
case (k, v: Array[Byte]) => (k, new Text(v))
case (k, v: Text) => (k, new Text(v))
}.saveAsNewAPIHadoopFile[MultipleTextOutputsFormat](outputPath)
}
}
implicit class AvroRDDSparkContext(val sparkContext: SparkContext) extends AnyVal {
def avroFile[T: ClassTag](path: String): RDD[T] = {
sparkContext.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[T]],
classOf[AvroKey[T]],
classOf[NullWritable],
avroParquetJob().getConfiguration)
.map[T](_._1.datum())
}
}
implicit class AvroRDD[T <: GenericRecord : ClassTag](val avroRDD: RDD[T]) {
/**
* Filters Avro records with certain fields not defined (are null) and logs the fact
*/
def filterIfUnexpectedNull(fields: String*): RDD[T] = {
avroRDD.filter(r => fields.forall(definedOrLog(r, _)))
}
def saveAsParquetFile(outputPath: String): Unit = {
avroRDD.map((null, _))
.saveAsNewAPIHadoopFile(outputPath,
classOf[Null],
classTag[T].runtimeClass,
classOf[ParquetOutputFormat[T]],
avroParquetJob[T]().getConfiguration)
}
def saveAsMultipleParquetFiles(outputKeyFunction: (T) => String, outputPath: String): Unit = {
avroRDD.map(r => ((outputKeyFunction(r), null), r))
.saveAsNewAPIHadoopFile(outputPath,
classOf[Null],
classTag[T].runtimeClass,
classOf[MultipleParquetOutputsFormat[T]],
avroParquetJob[T]().getConfiguration)
}
def saveAsAvroFile(outputPath: String): Unit = {
avroRDD.map(r => (new AvroKey[T](r), NullWritable.get))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[T]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[T]],
avroParquetJob[T]().getConfiguration)
}
def saveAsMultipleAvroFiles(outputKeyFunction: (T) => String, outputPath: String): Unit = {
avroRDD.map(r => ((outputKeyFunction(r), new AvroKey(r)), NullWritable.get))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[(String, T)]],
classOf[NullWritable],
classOf[MultipleAvroOutputsFormat[T]],
avroParquetJob[T]().getConfiguration)
}
}
}
package model.hadoop
import java.io.File
import java.util.UUID
import com.google.common.io.Files
import avro.UUIDUtils
import avro.geoip.{Lookup, Subdivision}
import avro.gulp.UsageFragment
import model.spark.extensions.SerializationExtensions._
import model.spark.extensions.SparkConfigurationExtensions._
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import scala.collection.JavaConverters._
object TestUtil {
def mkPath(ks: Any*): String = {
def keyMap(k: Any): String = k match {
case xs: TraversableOnce[_] => xs.mkString(",")
case id: avro.UUID => UUIDUtils.toJavaUUID(id).toString
case x => x.toString
}
ks.map(keyMap).mkString("/")
}
}
class MultipleOutputsFormatTest extends FunSuite with BeforeAndAfterAll {
val localAvroFile = "/home/silas/ephemera/filtered-frag-delete.avro"
val sparkConf = new SparkConf()
.setMaster("local")
.setAppName("Multiple Outputs Test")
.configureAvroSerialization()
val sc = new SparkContext(sparkConf)
val outputPath = "tmp"
val outputFile = new File(outputPath)
outputFile.mkdirs()
Files.fileTreeTraverser().postOrderTraversal(outputFile).asScala.foreach(_.delete)
def usageFragment(user: String, lang: String, country: String): UsageFragment =
UsageFragment.newBuilder()
.setId(UUIDUtils.fromJavaUUID(UUID.fromString(user)))
.setCloudUserId(UUIDUtils.fromJavaUUID(UUID.randomUUID()))
.setSubmissionTimestamp(System.currentTimeMillis())
.setEndTimestamp(System.currentTimeMillis() + 10000)
.setEnabledLanguages(List(lang).asJava)
.setGeoIP(Lookup.newBuilder()
.setCountry(country)
.setSubdivisions(List(Subdivision.newBuilder()
.setName("foo")
.build()).asJava).build()).build()
val usageFragments = List[UsageFragment](
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","en_GB", "UK"),
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"),
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"),
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"),
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","en_GB", "UK"),
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"),
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"),
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"),
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_EC", "Ecuador"),
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_GT", "Guatemala"),
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_GT", "Guatemala"),
usageFragment("7f45d45c-2e10-4355-9ea7-69b99bbcd4fa","en_GB", "UK"),
usageFragment("7f45d45c-2e10-4355-9ea7-69b99bbcd4fa","fr_FR", "UK"),
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","en_GB", "UK")
)
test("Test multiple text outputs") {
val values = sc.parallelize(List(
("fruit/items", "apple"),
("vegetable/items", "broccoli"),
("fruit/items", "pear"),
("fruit/items", "peach"),
("vegetable/items", "celery"),
("vegetable/items", "spinach")
))
values.saveAsMultipleTextFiles(s => s, s"$outputPath/text")
}
test("Test multiple avro outputs") {
val values = sc.parallelize(usageFragments)
values.saveAsMultipleAvroFiles(uf =>
TestUtil.mkPath(uf.getGeoIP.getCountry, uf.getCloudUserId),
s"$outputPath/avro")
}
test("Test multiple parquet outputs") {
val values = sc.parallelize(usageFragments)
values.saveAsMultipleParquetFiles(uf =>
TestUtil.mkPath(uf.getEnabledLanguages.asScala, uf.getCloudUserId, uf.getGeoIP.getCountry),
s"$outputPath/parquet")
}
override protected def afterAll(): Unit = sc.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment