Skip to content

Instantly share code, notes, and snippets.

@daithiocrualaoich
Created December 18, 2012 16:03
Show Gist options
  • Save daithiocrualaoich/4329266 to your computer and use it in GitHub Desktop.
Save daithiocrualaoich/4329266 to your computer and use it in GitHub Desktop.
Using the Hadoop Java API from Scala.
package com.gu.hadoop
import org.apache.hadoop.util.GenericOptionsParser
object ApplicationParameters {
def apply(args: Array[String]): List[String] = {
new GenericOptionsParser(args).getRemainingArgs.toList
}
}
package com.gu.hadoop
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.conf.Configuration
object HadoopJob {
// Using typesafe builder pattern: http://blog.rafaelferreira.net/2008/07/type-safe-builder-pattern-in-scala.html
// Some type aliases to reduce the amount of Java generic underscore nonsense
type RawInputFormat = InputFormat[_, _]
type RawOutputFormat = OutputFormat[_, _]
type RawMapper = Mapper[_, _, _, _]
type RawReducer = Reducer[_, _, _, _]
// TODO: Ensure input/mapper/combiner/reducer/output types are compatible at compile time
abstract class TRUE
abstract class FALSE
class JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF](
val name: String,
val configuration: Option[Configuration] = None,
val jarClass: Option[Class[_]] = None,
val inputFormatClass: Option[Class[_ <: RawInputFormat]] = None,
val mapperClass: Option[Class[_ <: RawMapper]] = None,
val mapOutputKeyClass: Option[Class[_]] = None,
val mapOutputValueClass: Option[Class[_]] = None,
val combinerClass: Option[Class[_ <: RawReducer]] = None,
val reducerClass: Option[Class[_ <: RawReducer]] = None,
val outputKeyClass: Option[Class[_]] = None,
val outputValueClass: Option[Class[_]] = None,
val outputFormatClass: Option[Class[_ <: RawOutputFormat]] = None) {
def withConf(conf: Configuration) = {
new JobSpecification[TRUE, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF](
name,
Some(conf),
jarClass,
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withJarClass[J]()(implicit mj: Manifest[J]) = {
new JobSpecification[HasConf, TRUE, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF](
name,
configuration,
Some(mj.erasure),
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withInputFormat[F <: RawInputFormat]()(implicit mf: Manifest[F]) = {
new JobSpecification[HasConf, HasJC, TRUE, HasM, HasMO, HasC, HasR, HasO, HasOF](
name,
configuration,
jarClass,
Some(mf.erasure.asInstanceOf[Class[RawInputFormat]]),
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withMapper[M <: RawMapper]()(implicit mm: Manifest[M]) = {
new JobSpecification[HasConf, HasJC, HasIF, TRUE, HasMO, HasC, HasR, HasO, HasOF](
name,
configuration,
jarClass,
inputFormatClass,
Some(mm.erasure.asInstanceOf[Class[RawMapper]]),
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withMapOutput[K, V]()(implicit mk: Manifest[K], mv: Manifest[V]) = {
// MapOutput is optional but needed with combiners
new JobSpecification[HasConf, HasJC, HasIF, HasM, TRUE, HasC, HasR, HasO, HasOF](
name,
configuration,
jarClass,
inputFormatClass,
mapperClass,
Some(mk.erasure),
Some(mv.erasure),
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withCombiner[R <: RawReducer]()(implicit mr: Manifest[R]) = {
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, TRUE, HasR, HasO, HasOF](
name,
configuration,
jarClass,
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
Some(mr.erasure.asInstanceOf[Class[RawReducer]]),
reducerClass,
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withReducer[R <: RawReducer]()(implicit mr: Manifest[R]) = {
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, TRUE, HasO, HasOF](
name,
configuration,
jarClass,
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
Some(mr.erasure.asInstanceOf[Class[RawReducer]]),
outputKeyClass,
outputValueClass,
outputFormatClass
)
}
def withOutput[K, V]()(implicit mk: Manifest[K], mv: Manifest[V]) = {
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, TRUE, HasOF](
name,
configuration,
jarClass,
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
Some(mk.erasure),
Some(mv.erasure),
outputFormatClass
)
}
def withOutputFormat[F <: RawOutputFormat]()(implicit mf: Manifest[F]) = {
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, TRUE](
name,
configuration,
jarClass,
inputFormatClass,
mapperClass,
mapOutputKeyClass,
mapOutputValueClass,
combinerClass,
reducerClass,
outputKeyClass,
outputValueClass,
Some(mf.erasure.asInstanceOf[Class[RawOutputFormat]])
)
}
}
/*
Only complete JobSpecifications can be built. The actual build method is
private and only available by implicit conversion from JobSpecifications
with correct phantom typing
- Combiners are optional but need MapOutputs
- Mappers, Reducers are optional
*/
implicit def combinersNeedMapOutputs(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, TRUE, TRUE, _, TRUE, TRUE]) = new {
def build(): Job = buildJob(jobSpecification)
}
implicit def otherwiseMapOutputsAreOptional(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, FALSE, _, _, TRUE, TRUE]) = new {
def build(): Job = buildJob(jobSpecification)
}
private def buildJob(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, _, _, _, TRUE, TRUE]): Job = {
val job = new Job(jobSpecification.configuration.get)
jobSpecification.jarClass foreach { job setJarByClass _ }
job setJobName jobSpecification.name
jobSpecification.inputFormatClass foreach { job setInputFormatClass _ }
jobSpecification.mapperClass foreach { job setMapperClass _ }
jobSpecification.combinerClass foreach { job setCombinerClass _ }
jobSpecification.mapOutputKeyClass foreach { job setMapOutputKeyClass _ }
jobSpecification.mapOutputValueClass foreach { job setMapOutputValueClass _ }
jobSpecification.reducerClass foreach { job setReducerClass _ }
jobSpecification.outputKeyClass foreach { job setOutputKeyClass _ }
jobSpecification.outputValueClass foreach { job setOutputValueClass _ }
jobSpecification.outputFormatClass foreach { job setOutputFormatClass _ }
job
}
def apply(name: String) = builder(name)
def builder(name: String) = new JobSpecification[FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE](name)
}
package com.gu.hadoop
import org.apache.hadoop.io.{ LongWritable, IntWritable, Text }
object `package` {
implicit def any2Text[A](a: Any) = new {
lazy val toText: Text = new Text(a.toString)
}
implicit object LongWritableNumeric extends Numeric[LongWritable] {
val two = fromInt(2)
val minusOne = fromInt(-1)
def plus(n: LongWritable, m: LongWritable): LongWritable = fromLong(toLong(n) + toLong(m))
def minus(n: LongWritable, m: LongWritable): LongWritable = plus(n, negate(m))
def times(n: LongWritable, m: LongWritable): LongWritable = fromLong(toLong(n) * toLong(m))
def negate(n: LongWritable): LongWritable = times(minusOne, n)
def fromInt(n: Int): LongWritable = fromLong(n)
def fromLong(n: Long): LongWritable = new LongWritable(n)
def toInt(n: LongWritable): Int = toLong(n).toInt
def toLong(n: LongWritable): Long = n.get
def toFloat(n: LongWritable): Float = toLong(n).toFloat
def toDouble(n: LongWritable): Double = toLong(n).toDouble
def compare(n: LongWritable, m: LongWritable) = toLong(n) compare toLong(m)
}
}
package com.gu.examples.word_count
import com.gu.hadoop._
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.io.{ Text, LongWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input._
import org.apache.hadoop.mapreduce.lib.output._
import org.apache.hadoop.util.{ Tool, ToolRunner }
import scala.collection.JavaConversions._
/*
WordCount
---------
Input: TextInputFormat = FileInputFormat[LongWritable,Text]
Map: (LongWritable, Text) -> (Text, LongWritable)
Input rows are mapped to (token, 1) outputs, one per token on the line.
Combiner: The reducer is algebraic, so it can be used as a combiner too.
Reduce: (Text, LongWritable) -> (Text, LongWritable)
Sum count values for each token text.
Output: TextOutputFormat[Text, LongWritable]
Text output files containing token and count entries.
*/
object Main extends App {
ToolRunner.run(new Job, args)
}
class JobMapper extends Mapper[LongWritable, Text, Text, LongWritable] {
// The following is type voodoo for the map method signature.
type MapperContext = Mapper[LongWritable, Text, Text, LongWritable]#Context
override def map(key: LongWritable, value: Text, context: MapperContext) {
value.toString split "\\s" foreach { token =>
context.write(token.toLowerCase.toText, LongWritableNumeric.one)
}
}
}
class JobReducer extends Reducer[Text, LongWritable, Text, LongWritable] {
// This makes the List[LongWritable] sum work in the reduce method.
val n = implicitly[Numeric[LongWritable]]
// The following is type voodoo for the reduce method signature.
type ReducerContext = Reducer[Text, LongWritable, Text, LongWritable]#Context
override def reduce(key: Text, values: java.lang.Iterable[LongWritable], context: ReducerContext) {
context.write(key, values.toList.sum)
}
}
class Job extends Configured with Tool {
override def run(args: Array[String]): Int = {
val params = ApplicationParameters(args)
val job = HadoopJob("word-count")
.withConf(getConf())
.withJarClass[Job]
.withInputFormat[TextInputFormat]
.withMapper[JobMapper]
.withCombiner[JobReducer]
.withReducer[JobReducer]
.withOutput[Text, LongWritable]
.withOutputFormat[TextOutputFormat[Text, LongWritable]]
.build()
FileInputFormat.setInputPaths(job, new Path(params(0)))
FileOutputFormat.setOutputPath(job, new Path(params(1)))
if (job waitForCompletion true) 0 else 1
}
}
package com.gu.examples
import com.gu.hadoop._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
import org.apache.hadoop.mrunit.mapreduce._
import org.scalatest.FlatSpec
import org.scalatest.matchers.ShouldMatchers
import scala.collection.JavaConversions._
class WordCountTest extends FlatSpec with ShouldMatchers {
"WordCount Mapper" should "count tokens" in {
(new MapDriver[LongWritable, Text, Text, LongWritable])
.withMapper(new word_count.JobMapper)
.withInput(LongWritableNumeric.one, "These are test tokens.".toText)
.withOutput("these".toText, LongWritableNumeric.one)
.withOutput("are".toText, LongWritableNumeric.one)
.withOutput("test".toText, LongWritableNumeric.one)
.withOutput("tokens.".toText, LongWritableNumeric.one)
.runTest()
}
it should "count twice for duplicate tokens" in {
(new MapDriver[LongWritable, Text, Text, LongWritable])
.withMapper(new word_count.JobMapper)
.withInput(LongWritableNumeric.one, "These are test tokens. These are more tokens.".toText)
.withOutput("these".toText, LongWritableNumeric.one)
.withOutput("are".toText, LongWritableNumeric.one)
.withOutput("test".toText, LongWritableNumeric.one)
.withOutput("tokens.".toText, LongWritableNumeric.one)
.withOutput("these".toText, LongWritableNumeric.one)
.withOutput("are".toText, LongWritableNumeric.one)
.withOutput("more".toText, LongWritableNumeric.one)
.withOutput("tokens.".toText, LongWritableNumeric.one)
.runTest()
}
"WordCount Reducer" should "combine single tokens" in {
(new ReduceDriver[Text, LongWritable, Text, LongWritable])
.withReducer(new word_count.JobReducer)
.withInput("tokens".toText, List(LongWritableNumeric.one))
.withOutput("tokens".toText, LongWritableNumeric.one)
.runTest()
}
it should "combine duplicate tokens" in {
(new ReduceDriver[Text, LongWritable, Text, LongWritable])
.withReducer(new word_count.JobReducer)
.withInput("tokens".toText, List(LongWritableNumeric.one, LongWritableNumeric.one))
.withOutput("tokens".toText, LongWritableNumeric.two)
.runTest()
}
"WordCount MapReducer" should "count tokens" in {
// Output in key alphabetic order to test
val keyOrder = new KeyFieldBasedComparator[Text, LongWritable].asInstanceOf[RawComparator[Text]]
(new MapReduceDriver[LongWritable, Text, Text, LongWritable, Text, LongWritable])
.withMapper(new word_count.JobMapper)
.withCombiner(new word_count.JobReducer)
.withReducer(new word_count.JobReducer)
.withKeyOrderComparator(keyOrder)
.withInput(LongWritableNumeric.one, "These are test tokens.".toText)
.withOutput("are".toText, LongWritableNumeric.one)
.withOutput("test".toText, LongWritableNumeric.one)
.withOutput("these".toText, LongWritableNumeric.one)
.withOutput("tokens.".toText, LongWritableNumeric.one)
.runTest()
}
it should "count duplicate tokens" in {
// Output in key alphabetic order to test
val keyOrder = new KeyFieldBasedComparator[Text, LongWritable].asInstanceOf[RawComparator[Text]]
(new MapReduceDriver[LongWritable, Text, Text, LongWritable, Text, LongWritable])
.withMapper(new word_count.JobMapper)
.withCombiner(new word_count.JobReducer)
.withReducer(new word_count.JobReducer)
.withKeyOrderComparator(keyOrder)
.withInput(LongWritableNumeric.one, "These are test tokens. These are more tokens.".toText)
.withOutput("are".toText, LongWritableNumeric.two)
.withOutput("more".toText, LongWritableNumeric.one)
.withOutput("test".toText, LongWritableNumeric.one)
.withOutput("these".toText, LongWritableNumeric.two)
.withOutput("tokens.".toText, LongWritableNumeric.two)
.runTest()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment