Skip to content

Instantly share code, notes, and snippets.

@berngp
Last active December 15, 2015 10:59
Show Gist options
  • Save berngp/5249607 to your computer and use it in GitHub Desktop.
Save berngp/5249607 to your computer and use it in GitHub Desktop.
Hadoop HDFS Sequence File Writer with Static Typing Validation.
/*
* Copyright 2012-2013 Bernardo Gomez Palacio.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.berngp.thriftexample
import collection.immutable.TreeMap
import collection.mutable
import collection.mutable.ArrayBuffer
import java.{util => jutil}
import net.liftweb.common._
import org.apache.hadoop.conf.{Configuration => HadoopConf}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.SequenceFile.Metadata
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io._
import org.apache.hadoop.util.Progressable
import scala.Some
import scala.collection.JavaConversions._
object HadoopSequenceFileWriter {
case class HadoopSequenceFileWriterRecipe private[HadoopSequenceFileWriter](hadoopConf: HadoopConf, options: Seq[Writer.Option])
extends net.liftweb.util.ControlHelpers with Logger {
def asSequenceFileWriter(): Box[SequenceFile.Writer] = try {
Full(SequenceFile.createWriter(hadoopConf, options: _*))
} catch {
case iae: IllegalArgumentException =>
Failure("Illegal Argument found for the Sequence File Writer!", Full(iae), None)
case e: Exception =>
Failure("Unable to create a Sequence File Writer!", Full(e), None)
}
def doWithSequenceFileWriter[A](f: SequenceFile.Writer => A): Box[A] = {
asSequenceFileWriter() match {
case Full(writer) =>
val fBox = tryo[A](f(writer))
tryo(writer.close()) match {
case f: Failure =>
warn("Failed to close the SequenceFile.Writer silently!", f)
case _ =>
}
fBox
case f: Failure =>
f
case Empty =>
Empty
}
}
}
/**
* Type used to mark an attribute of the hdfsWriter as required.
* TODO Document how this works to ensure all required attributes are set before we generate a writer.
*/
abstract class BUILDER_REQ
/** */
abstract class PRESENT extends BUILDER_REQ
/** */
abstract class MISSING extends BUILDER_REQ
class HadoopSequenceFileWriterBuilder[HC <: BUILDER_REQ, HF <: BUILDER_REQ](val theHadoopConf: HadoopConf = null,
val theFile: Option[Either[String, Path]] = None,
val theMeta: Option[TreeMap[String, String]] = None,
val theBufferSize: Option[Int] = None,
val theBlockSize: Option[Long] = None,
val theCompression: Option[CompressionType] = Some(CompressionType.BLOCK),
val theReplication: Option[Short] = Some(0),
val theKeyClass: Option[Class[_]] = Some(classOf[NullWritable]),
val theValueClass: Option[Class[_]] = Some(classOf[Text]),
val theProgressableReporter: Option[Progressable] = None) {
private def _builder[HC <: BUILDER_REQ, HF <: BUILDER_REQ] = {
new HadoopSequenceFileWriterBuilder[HC, HF](_, _, _, _, _, _, _, _, _, _)
}
def withHadoopConf(c: HadoopConf) =
_builder[PRESENT, HF](c, theFile, theMeta, theBufferSize, theBlockSize, theCompression, theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withFile(f: Either[String, Path]): HadoopSequenceFileWriterBuilder[HC, PRESENT] =
_builder[HC, PRESENT](theHadoopConf, Some(f), theMeta, theBufferSize, theBlockSize, theCompression, theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withFile(f: String): HadoopSequenceFileWriterBuilder[HC, PRESENT] = withFile(Left(f))
def withFile(f: Path): HadoopSequenceFileWriterBuilder[HC, PRESENT] = withFile(Right(f))
def withMeta(m: TreeMap[String, String]) =
_builder[HC, HF](theHadoopConf, theFile, Some(m), theBufferSize, theBlockSize, theCompression, theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withBufferSize(b: Int) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, Some(b), theBlockSize, theCompression, theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withBlockSize(b: Long) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, Some(b), theCompression, theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withCompression(c: CompressionType) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, theBlockSize, Some(c), theReplication, theKeyClass, theValueClass, theProgressableReporter)
def withReplication(r: Short) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, theBlockSize, theCompression, Some(r), theKeyClass, theValueClass, theProgressableReporter)
def withKeyClass(k: Class[_]) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, theBlockSize, theCompression, theReplication, Some(k), theValueClass, theProgressableReporter)
def withValueClass(v: Class[_]) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, theBlockSize, theCompression, theReplication, theKeyClass, Some(v), theProgressableReporter)
def withProgressable(p: Progressable) =
_builder[HC, HF](theHadoopConf, theFile, theMeta, theBufferSize, theBlockSize, theCompression, theReplication, theKeyClass, theValueClass, Some(p))
def options() = {
val seqBuffer: ArrayBuffer[Writer.Option] = mutable.ArrayBuffer()
if (theFile.isDefined) {
theFile.get match {
case Left(fileStr) =>
seqBuffer += Writer.file(new Path(fileStr))
case Right(path) =>
seqBuffer += Writer.file(path)
}
}
if (theMeta.isDefined)
seqBuffer += Writer.metadata(new Metadata(new jutil.TreeMap(theMeta.get.map(v => new Text(v._1) -> new Text(v._2)))))
if (theBufferSize.isDefined)
seqBuffer += Writer.bufferSize(theBufferSize.get)
if (theBlockSize.isDefined)
seqBuffer += Writer.blockSize(theBlockSize.get)
if (theCompression.isDefined)
seqBuffer += Writer.compression(theCompression.get)
if (theReplication.isDefined)
seqBuffer += Writer.replication(theReplication.get)
if (theKeyClass.isDefined)
seqBuffer += Writer.keyClass(theKeyClass.get)
if (theValueClass.isDefined)
seqBuffer += Writer.valueClass(theValueClass.get)
if (theProgressableReporter.isDefined)
seqBuffer += Writer.progressable(theProgressableReporter.get)
seqBuffer.toSeq
}
}
implicit def enableSequenceFileWriterBuilder(builder: HadoopSequenceFileWriterBuilder[PRESENT, PRESENT]) = new {
def build() = {
new HadoopSequenceFileWriterRecipe(builder.theHadoopConf, builder.options())
}
}
def hdfsWriter() =
new HadoopSequenceFileWriterBuilder[MISSING, MISSING]()
implicit def stringToText(string: String): Text = new Text(string)
implicit class NilToHadoopWritable(nil: List[Nothing]) {
def toWritable() = NullWritable.get()
}
implicit class StringToHadoopWritable(string: String) {
def toWritable() = new Text(string)
def toText() = toWritable()
}
implicit class LongToHadoopWritable(long: Long) {
def toWritable() = new LongWritable(long)
}
implicit class ShortToHadoopWritable(short: Short) {
def toWritable() = new ShortWritable(short)
}
implicit class MapToHadoopWritable[K <: Writable, V <: Writable](map: Map[K, V]) {
def toWritable() = {
val mapWritable = new MapWritable()
mapWritable.putAll(map)
mapWritable
}
}
}
/*
* Copyright 2012-2013 Bernardo Gomez Palacio.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.berngp.thriftexample
import net.liftweb.common.Failure
import org.apache.hadoop.conf.{Configuration => HadoopConf}
import org.apache.hadoop.fs.{FileSystem => HadoopFileSystem, LocalFileSystem, Path}
import org.apache.hadoop.io.SequenceFile
import org.scalatest._
import org.scalatest.matchers.ShouldMatchers._
import thrift.example.{BinPacket => ThriftBinPacket}
class NetPacketAsThriftFilesSpec extends FlatSpec with SequentialNestedSuiteExecution {
behavior of "Interaction between a Net Packet Traffic Generator and Thrift SequenceFiles HDFs Writers"
import HadoopSequenceFileWriter._
import NetPacketThriftGateway._
import NetPacketTrafficGenerator._
object buffer {
var builderBox: Option[NetTrafficPacketPlanBuilder[_ <: NetPacketTrafficGenerator.BUILDER_REQ, _ <: NetPacketTrafficGenerator.BUILDER_REQ]] = None
var thrifts: Option[Seq[ThriftBinPacket]] = None
}
def hadoopConf() = {
val conf = new HadoopConf()
conf.set("io.serializations", Seq(
"org.apache.hadoop.io.serializer.WritableSerialization"
, "com.github.berngp.thriftexample.ThriftSerialization"
).reduce(_ + "," + _))
conf
}
object fixtures {
val runIdentifier = java.util.UUID.randomUUID
val destinationAddresses = (1 to 10).map("192.168.1." + _).map(IPv4Address(_)).map(_.get).toSet
val sourceAddresses = (1 to 10).map("192.168.100." + _).map(IPv4Address(_)).map(_.get).toSet
val timeSeriesSize = 10
val conf = hadoopConf()
val fs = new LocalFileSystem(HadoopFileSystem.get(conf))
val seqFilesDir = new Path(s"./target/hdfs/net-packets-thrift/${runIdentifier}")
}
it should "instantiate a hdfsWriter" in {
buffer.builderBox = Some(builder())
buffer.builderBox should be('defined)
}
it should "be cable to generate a Time Series and encode it to Thrift Objects" in {
val b = buffer.builderBox.get
val g = b
.withDestinationsAddresses(fixtures.destinationAddresses)
.withSourceAddresses(fixtures.sourceAddresses)
.withSize(fixtures.timeSeriesSize)
.withVoter(Voting.constant)
.build()
val p = g.getPlan
val thrifts = p.timeSeries.map(_.asThriftBox()).filter(_.isDefined).map(_.get)
thrifts.size should be(p.timeSeries.size)
buffer.thrifts = Some(thrifts)
}
it should "persist the Thrift Objects into Sequence Files using the ThriftSerialization" in {
val b = hdfsWriter() withHadoopConf fixtures.conf withValueClass classOf[ThriftBinPacket]
buffer.thrifts.get.view.zipWithIndex.foreach {
t =>
val a = b withFile (fixtures.seqFilesDir + s"/ThriftBinPacket/${t._2}.seq")
a build() doWithSequenceFileWriter {
writer =>
writer.append(Nil.toWritable(), t._1)
} match {
case f: Failure =>
fail(f.msg, f.exception.openOr(new IllegalStateException("Exception expected!")))
case _ =>
}
}
}
it should "read the Thrift Objects contained in the SequenceFiles using the ThriftSerialization" in {
buffer.thrifts.get.view.zipWithIndex.foreach {
case (t, index) =>
val path = s"${fixtures.seqFilesDir}/ThriftBinPacket/${index}.seq"
val optFile = SequenceFile.Reader.file(new Path(path))
val seqReader = new SequenceFile.Reader(fixtures.conf, optFile)
val key = seqReader.next(Nil.toWritable())
key should be (true)
val stored = new ThriftBinPacket()
seqReader.getCurrentValue(stored)
stored should be(t)
}
}
it should "persist the Thrift Objects into SequenceFiles usintg the WritableThriftBinPacket" in {
val filePath = s"${fixtures.seqFilesDir}/WritableThriftBinPacket"
info(s"Writting to path ${filePath}")
// Configure the Builder
val b = hdfsWriter() withHadoopConf fixtures.conf withValueClass classOf[WritableThriftBinPacket]
buffer.thrifts.get.view.zipWithIndex.foreach {
case (t, index) =>
val a = b withFile s"${filePath}/${index}.seq"
a build() doWithSequenceFileWriter {
writer =>
writer.append(Nil.toWritable(), t.toWritable())
writer.hsync()
} match {
case f: Failure =>
fail(f.msg, f.exception.openOr(new IllegalStateException("Exception expected!")))
case _ =>
}
}
}
it should "read the SequenceFiles using the WritableThriftBinPacket" in {
val filePath = s"${fixtures.seqFilesDir}/WritableThriftBinPacket"
info(s"Reading files from path ${filePath}")
buffer.thrifts.get.view.zipWithIndex.foreach {
case (t, index) =>
val path = s"${filePath}/${index}.seq"
val optFile = SequenceFile.Reader.file(new Path(path))
val seqReader = new SequenceFile.Reader(fixtures.conf, optFile)
val key = seqReader.next(Nil.toWritable())
key should be (true)
val stored = new WritableThriftBinPacket()
seqReader.getCurrentValue(stored)
stored should be(t)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment