Last active
January 18, 2022 07:07
-
-
Save silasdavis/d1d1f1f7ab78249af462 to your computer and use it in GitHub Desktop.
Wrapping OutputFormat to produce multiple outputs with hadoop MultipleOutputs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This file contains the core idea of wrapping an underlying OutputFormat with an OutputFormat | |
* with an augmented key that writes to partitions using MultipleOutputs (or something similar) | |
*/ | |
package model.hadoop | |
import model.hadoop.HadoopIO.MultipleOutputer | |
import model.hadoop.HadoopIO.MultipleOutputer._ | |
import org.apache.hadoop.io.{DataInputBuffer, NullWritable} | |
import org.apache.hadoop.mapred.RawKeyValueIterator | |
import org.apache.hadoop.mapreduce.counters.GenericCounter | |
import org.apache.hadoop.mapreduce.lib.output.{LazyOutputFormat, MultipleOutputs} | |
import org.apache.hadoop.mapreduce.task.ReduceContextImpl | |
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter | |
import org.apache.hadoop.mapreduce.{Job, _} | |
import org.apache.hadoop.util.Progress | |
object MultipleOutputsFormat { | |
// Type inference fails with this inlined in constructor parameters | |
private def defaultMultipleOutputsMaker[K, V](io: TaskInputOutputContext[_, _, K, V]): MultipleOutputer[K, V] = | |
new MultipleOutputs[K, V](io) | |
} | |
/** | |
* Subclass this to create a multiple output creating OutputFormat. The subclass must have a nullary constructor so | |
* hadoop can construct it with `.newInstance`, ugh... | |
* | |
* The output format expects a two-part key of the form (outputPath, actualKey), the string outputPath will be used to | |
* partition the output into different directories ('/' separated filenames). | |
* | |
* For some reason MultipleOutputs does not work with Avro, but the near-identical AvroMultipleOutputs does. Irritatingly | |
* these obviously related classes have no common ancestor so they are combined under the MultipleOutputer type class | |
* which at least allows for future extension. | |
* | |
* @param outputFormat the underlying OutputFormat responsible for writing to varies | |
* @param multipleOutputsMaker factory method for constructing an object implementing the MultiplerOutputer trait | |
* @tparam K key type of the underlying OutputFormat | |
* @tparam V value type of the underlying OutputFormat | |
*/ | |
abstract class MultipleOutputsFormat[K, V] | |
(outputFormat: OutputFormat[K, V], | |
multipleOutputsMaker: TaskInputOutputContext[_, _, K, V] => MultipleOutputer[K, V] = | |
(r: TaskInputOutputContext[_, _, K, V]) => MultipleOutputsFormat.defaultMultipleOutputsMaker[K, V](r)) | |
extends OutputFormat[(String, K), V] { | |
override def checkOutputSpecs(context: JobContext): Unit = outputFormat.checkOutputSpecs(context) | |
override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = outputFormat | |
.getOutputCommitter(context) | |
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[(String, K), V] = | |
new RecordWriter[(String, K), V] { | |
val job = Job.getInstance(context.getConfiguration) | |
// Set underlying output format using lazy output | |
LazyOutputFormat.setOutputFormatClass(job, outputFormat.getClass) | |
// We pass a ReduceContext with most fields dummied-out since they will not be used in the context | |
// of Spark's saveAs*Hadoop* methods | |
val ioContext = new ReduceContextImpl(job.getConfiguration, context.getTaskAttemptID, | |
new DummyIterator, new GenericCounter, new GenericCounter, | |
new DummyRecordWriter, new DummyOutputCommitter, new DummyReporter, null, | |
classOf[NullWritable], classOf[NullWritable]) | |
val multipleOutputs: MultipleOutputer[K, V] = multipleOutputsMaker(ioContext) | |
override def write(keys: (String, K), value: V): Unit = { | |
keys match { | |
case (path, key) => | |
multipleOutputs.write(key, value, path) | |
} | |
} | |
override def close(context: TaskAttemptContext): Unit = multipleOutputs.close() | |
} | |
private class DummyOutputCommitter extends OutputCommitter { | |
override def setupJob(jobContext: JobContext): Unit = () | |
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = false | |
override def setupTask(taskContext: TaskAttemptContext): Unit = () | |
override def commitTask(taskContext: TaskAttemptContext): Unit = () | |
override def abortTask(taskContext: TaskAttemptContext): Unit = () | |
} | |
private class DummyRecordWriter extends RecordWriter[K, V] { | |
override def write(key: K, value: V): Unit = () | |
override def close(context: TaskAttemptContext): Unit = () | |
} | |
private class DummyIterator extends RawKeyValueIterator { | |
override def getKey: DataInputBuffer = null | |
override def getValue: DataInputBuffer = null | |
override def getProgress: Progress = null | |
override def close(): Unit = () | |
override def next: Boolean = true | |
} | |
} | |
/** | |
* Code below gives some more context to the MultipleOutputsFormat class | |
*/ | |
// This type class wraps MultipleOutputs-style writers | |
package model.hadoop | |
import org.apache.avro.mapreduce.AvroMultipleOutputs | |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs | |
object HadoopIO { | |
/** | |
* This is a work-around for the fact that Avro has it's own non-type-compatible version of MultipleOutputs | |
* it also allows for extending the functionality of MultipleOutputsFormat for other multiple output writers by | |
* providing this missing interface as a trait. | |
*/ | |
trait MultipleOutputer[K, V] { | |
def write(key: K, value: V, path: String): Unit | |
def close(): Unit | |
} | |
object MultipleOutputer { | |
implicit class AvroMultipleOutputer[K, V](mo: AvroMultipleOutputs) extends MultipleOutputer[K, V] { | |
def write(key: K, value: V, path: String): Unit = mo.write(key, value, path) | |
def close(): Unit = mo.close() | |
} | |
implicit class PlainMultipleOutputer[K, V](mo: MultipleOutputs[K, V]) extends MultipleOutputer[K, V] { | |
def write(key: K, value: V, path: String): Unit = mo.write(key, value, path) | |
def close(): Unit = mo.close() | |
} | |
} | |
} | |
// SPECIFIC MULTIPLE OUTPUT FORMAT CLASSES | |
// For Parquet | |
package model.hadoop | |
import org.apache.avro.generic.GenericContainer | |
import parquet.hadoop.ParquetOutputFormat | |
class MultipleParquetOutputsFormat[T <: GenericContainer] | |
extends MultipleOutputsFormat (new ParquetOutputFormat[T]) { | |
} | |
// For Avro | |
package model.hadoop | |
import model.hadoop.HadoopIO.MultipleOutputer | |
import org.apache.avro.generic.GenericContainer | |
import org.apache.avro.mapred.AvroKey | |
import org.apache.avro.mapreduce.{AvroKeyOutputFormat, AvroMultipleOutputs} | |
import org.apache.hadoop.io.NullWritable | |
import org.apache.hadoop.mapreduce.TaskInputOutputContext | |
object MultipleAvroOutputsFormat { | |
// This seems to be an unfortunate limitation of type inference of lambda defaults within constructor params. | |
// If it would work I would just inline this function | |
def amoMaker[T](io: TaskInputOutputContext[_, _, AvroKey[T], NullWritable]): | |
MultipleOutputer[AvroKey[T], NullWritable] = new AvroMultipleOutputs(io) | |
} | |
class MultipleAvroOutputsFormat[T <: GenericContainer] | |
extends MultipleOutputsFormat(new AvroKeyOutputFormat[T], | |
(io: TaskInputOutputContext[_, _, AvroKey[T], NullWritable]) => MultipleAvroOutputsFormat.amoMaker(io)) { | |
} | |
// For Text | |
package model.hadoop | |
import org.apache.hadoop.io.{NullWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat | |
class MultipleTextOutputsFormat extends MultipleOutputsFormat(new TextOutputFormat[NullWritable, Text]) { | |
} | |
// END OF SPECIFIC MULTIPLE OUTPUTS FORMATS | |
// These extensions show how to read and write Avro RDDs and use MultipleOutputsFormat | |
package model.spark.extensions | |
import model.hadoop.{MultipleAvroOutputsFormat, MultipleParquetOutputsFormat, MultipleTextOutputsFormat} | |
import model.util.TextLike | |
import org.apache.avro.Schema | |
import org.apache.avro.generic.GenericRecord | |
import org.apache.avro.mapred.AvroKey | |
import org.apache.avro.mapreduce.{AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat} | |
import org.apache.avro.specific.SpecificData | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.{NullWritable, Text} | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.log4j.Logger | |
import org.apache.spark.SparkContext | |
import org.apache.spark.rdd.RDD | |
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport} | |
import parquet.hadoop.ParquetOutputFormat | |
import scala.reflect.{ClassTag, _} | |
object SerializationExtensions { | |
val logger = Logger.getLogger(getClass) | |
def avroParquetJob[T: ClassTag](job: Job = Job.getInstance(new Configuration())): Job = { | |
val schema: Schema = SpecificData.get.getSchema(classTag[T].runtimeClass) | |
AvroJob.setInputKeySchema(job, schema) | |
AvroJob.setOutputKeySchema(job, schema) | |
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) | |
ParquetOutputFormat.setEnableDictionary(job, false) | |
AvroParquetOutputFormat.setSchema(job, schema) | |
job | |
} | |
def definedOrLog(record: GenericRecord, field: String): Boolean = { | |
if (record.get(field) != null) return true | |
logger.warn(s"Expected field '$field' to be defined, but it was not on record of type '${record.getClass}'") | |
false | |
} | |
implicit class TextKeyRDD[K, V: TextLike](rdd: RDD[(K, V)]) { | |
def saveAsMultipleTextFiles(outputKeyFunction: (K) => String, outputPath: String): Unit = { | |
rdd.map{case (k,v) => ((outputKeyFunction(k), NullWritable.get),v)}.map { | |
case (k, v: String) => (k, new Text(v)) | |
case (k, v: Array[Byte]) => (k, new Text(v)) | |
case (k, v: Text) => (k, new Text(v)) | |
}.saveAsNewAPIHadoopFile[MultipleTextOutputsFormat](outputPath) | |
} | |
} | |
implicit class AvroRDDSparkContext(val sparkContext: SparkContext) extends AnyVal { | |
def avroFile[T: ClassTag](path: String): RDD[T] = { | |
sparkContext.newAPIHadoopFile(path, | |
classOf[AvroKeyInputFormat[T]], | |
classOf[AvroKey[T]], | |
classOf[NullWritable], | |
avroParquetJob().getConfiguration) | |
.map[T](_._1.datum()) | |
} | |
} | |
implicit class AvroRDD[T <: GenericRecord : ClassTag](val avroRDD: RDD[T]) { | |
/** | |
* Filters Avro records with certain fields not defined (are null) and logs the fact | |
*/ | |
def filterIfUnexpectedNull(fields: String*): RDD[T] = { | |
avroRDD.filter(r => fields.forall(definedOrLog(r, _))) | |
} | |
def saveAsParquetFile(outputPath: String): Unit = { | |
avroRDD.map((null, _)) | |
.saveAsNewAPIHadoopFile(outputPath, | |
classOf[Null], | |
classTag[T].runtimeClass, | |
classOf[ParquetOutputFormat[T]], | |
avroParquetJob[T]().getConfiguration) | |
} | |
def saveAsMultipleParquetFiles(outputKeyFunction: (T) => String, outputPath: String): Unit = { | |
avroRDD.map(r => ((outputKeyFunction(r), null), r)) | |
.saveAsNewAPIHadoopFile(outputPath, | |
classOf[Null], | |
classTag[T].runtimeClass, | |
classOf[MultipleParquetOutputsFormat[T]], | |
avroParquetJob[T]().getConfiguration) | |
} | |
def saveAsAvroFile(outputPath: String): Unit = { | |
avroRDD.map(r => (new AvroKey[T](r), NullWritable.get)) | |
.saveAsNewAPIHadoopFile(outputPath, | |
classOf[AvroKey[T]], | |
classOf[NullWritable], | |
classOf[AvroKeyOutputFormat[T]], | |
avroParquetJob[T]().getConfiguration) | |
} | |
def saveAsMultipleAvroFiles(outputKeyFunction: (T) => String, outputPath: String): Unit = { | |
avroRDD.map(r => ((outputKeyFunction(r), new AvroKey(r)), NullWritable.get)) | |
.saveAsNewAPIHadoopFile(outputPath, | |
classOf[AvroKey[(String, T)]], | |
classOf[NullWritable], | |
classOf[MultipleAvroOutputsFormat[T]], | |
avroParquetJob[T]().getConfiguration) | |
} | |
} | |
} | |
package model.hadoop | |
import java.io.File | |
import java.util.UUID | |
import com.google.common.io.Files | |
import avro.UUIDUtils | |
import avro.geoip.{Lookup, Subdivision} | |
import avro.gulp.UsageFragment | |
import model.spark.extensions.SerializationExtensions._ | |
import model.spark.extensions.SparkConfigurationExtensions._ | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.scalatest.{BeforeAndAfterAll, FunSuite} | |
import scala.collection.JavaConverters._ | |
object TestUtil { | |
def mkPath(ks: Any*): String = { | |
def keyMap(k: Any): String = k match { | |
case xs: TraversableOnce[_] => xs.mkString(",") | |
case id: avro.UUID => UUIDUtils.toJavaUUID(id).toString | |
case x => x.toString | |
} | |
ks.map(keyMap).mkString("/") | |
} | |
} | |
class MultipleOutputsFormatTest extends FunSuite with BeforeAndAfterAll { | |
val localAvroFile = "/home/silas/ephemera/filtered-frag-delete.avro" | |
val sparkConf = new SparkConf() | |
.setMaster("local") | |
.setAppName("Multiple Outputs Test") | |
.configureAvroSerialization() | |
val sc = new SparkContext(sparkConf) | |
val outputPath = "tmp" | |
val outputFile = new File(outputPath) | |
outputFile.mkdirs() | |
Files.fileTreeTraverser().postOrderTraversal(outputFile).asScala.foreach(_.delete) | |
def usageFragment(user: String, lang: String, country: String): UsageFragment = | |
UsageFragment.newBuilder() | |
.setId(UUIDUtils.fromJavaUUID(UUID.fromString(user))) | |
.setCloudUserId(UUIDUtils.fromJavaUUID(UUID.randomUUID())) | |
.setSubmissionTimestamp(System.currentTimeMillis()) | |
.setEndTimestamp(System.currentTimeMillis() + 10000) | |
.setEnabledLanguages(List(lang).asJava) | |
.setGeoIP(Lookup.newBuilder() | |
.setCountry(country) | |
.setSubdivisions(List(Subdivision.newBuilder() | |
.setName("foo") | |
.build()).asJava).build()).build() | |
val usageFragments = List[UsageFragment]( | |
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","en_GB", "UK"), | |
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"), | |
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"), | |
usageFragment("3ebb0e0d-891e-4675-9ec9-3f3665195048","fr_FR", "France"), | |
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","en_GB", "UK"), | |
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"), | |
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"), | |
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","es_LA", "UK"), | |
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_EC", "Ecuador"), | |
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_GT", "Guatemala"), | |
usageFragment("af415c0a-aa72-4f91-8118-9e8eb377cfc2","es_GT", "Guatemala"), | |
usageFragment("7f45d45c-2e10-4355-9ea7-69b99bbcd4fa","en_GB", "UK"), | |
usageFragment("7f45d45c-2e10-4355-9ea7-69b99bbcd4fa","fr_FR", "UK"), | |
usageFragment("ea7fbcce-98ea-4e44-97b2-d4783a95a700","en_GB", "UK") | |
) | |
test("Test multiple text outputs") { | |
val values = sc.parallelize(List( | |
("fruit/items", "apple"), | |
("vegetable/items", "broccoli"), | |
("fruit/items", "pear"), | |
("fruit/items", "peach"), | |
("vegetable/items", "celery"), | |
("vegetable/items", "spinach") | |
)) | |
values.saveAsMultipleTextFiles(s => s, s"$outputPath/text") | |
} | |
test("Test multiple avro outputs") { | |
val values = sc.parallelize(usageFragments) | |
values.saveAsMultipleAvroFiles(uf => | |
TestUtil.mkPath(uf.getGeoIP.getCountry, uf.getCloudUserId), | |
s"$outputPath/avro") | |
} | |
test("Test multiple parquet outputs") { | |
val values = sc.parallelize(usageFragments) | |
values.saveAsMultipleParquetFiles(uf => | |
TestUtil.mkPath(uf.getEnabledLanguages.asScala, uf.getCloudUserId, uf.getGeoIP.getCountry), | |
s"$outputPath/parquet") | |
} | |
override protected def afterAll(): Unit = sc.stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment