Created
December 18, 2012 16:03
-
-
Save daithiocrualaoich/4329266 to your computer and use it in GitHub Desktop.
Using the Hadoop Java API from Scala.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.hadoop | |
import org.apache.hadoop.util.GenericOptionsParser | |
object ApplicationParameters { | |
def apply(args: Array[String]): List[String] = { | |
new GenericOptionsParser(args).getRemainingArgs.toList | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.hadoop | |
import org.apache.hadoop.mapreduce._ | |
import org.apache.hadoop.conf.Configuration | |
object HadoopJob { | |
// Using typesafe builder pattern: http://blog.rafaelferreira.net/2008/07/type-safe-builder-pattern-in-scala.html | |
// Some type aliases to reduce the amount of Java generic underscore nonsense | |
type RawInputFormat = InputFormat[_, _] | |
type RawOutputFormat = OutputFormat[_, _] | |
type RawMapper = Mapper[_, _, _, _] | |
type RawReducer = Reducer[_, _, _, _] | |
// TODO: Ensure input/mapper/combiner/reducer/output types are compatible at compile time | |
abstract class TRUE | |
abstract class FALSE | |
class JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF]( | |
val name: String, | |
val configuration: Option[Configuration] = None, | |
val jarClass: Option[Class[_]] = None, | |
val inputFormatClass: Option[Class[_ <: RawInputFormat]] = None, | |
val mapperClass: Option[Class[_ <: RawMapper]] = None, | |
val mapOutputKeyClass: Option[Class[_]] = None, | |
val mapOutputValueClass: Option[Class[_]] = None, | |
val combinerClass: Option[Class[_ <: RawReducer]] = None, | |
val reducerClass: Option[Class[_ <: RawReducer]] = None, | |
val outputKeyClass: Option[Class[_]] = None, | |
val outputValueClass: Option[Class[_]] = None, | |
val outputFormatClass: Option[Class[_ <: RawOutputFormat]] = None) { | |
def withConf(conf: Configuration) = { | |
new JobSpecification[TRUE, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF]( | |
name, | |
Some(conf), | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withJarClass[J]()(implicit mj: Manifest[J]) = { | |
new JobSpecification[HasConf, TRUE, HasIF, HasM, HasMO, HasC, HasR, HasO, HasOF]( | |
name, | |
configuration, | |
Some(mj.erasure), | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withInputFormat[F <: RawInputFormat]()(implicit mf: Manifest[F]) = { | |
new JobSpecification[HasConf, HasJC, TRUE, HasM, HasMO, HasC, HasR, HasO, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
Some(mf.erasure.asInstanceOf[Class[RawInputFormat]]), | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withMapper[M <: RawMapper]()(implicit mm: Manifest[M]) = { | |
new JobSpecification[HasConf, HasJC, HasIF, TRUE, HasMO, HasC, HasR, HasO, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
Some(mm.erasure.asInstanceOf[Class[RawMapper]]), | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withMapOutput[K, V]()(implicit mk: Manifest[K], mv: Manifest[V]) = { | |
// MapOutput is optional but needed with combiners | |
new JobSpecification[HasConf, HasJC, HasIF, HasM, TRUE, HasC, HasR, HasO, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
Some(mk.erasure), | |
Some(mv.erasure), | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withCombiner[R <: RawReducer]()(implicit mr: Manifest[R]) = { | |
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, TRUE, HasR, HasO, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
Some(mr.erasure.asInstanceOf[Class[RawReducer]]), | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withReducer[R <: RawReducer]()(implicit mr: Manifest[R]) = { | |
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, TRUE, HasO, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
Some(mr.erasure.asInstanceOf[Class[RawReducer]]), | |
outputKeyClass, | |
outputValueClass, | |
outputFormatClass | |
) | |
} | |
def withOutput[K, V]()(implicit mk: Manifest[K], mv: Manifest[V]) = { | |
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, TRUE, HasOF]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
Some(mk.erasure), | |
Some(mv.erasure), | |
outputFormatClass | |
) | |
} | |
def withOutputFormat[F <: RawOutputFormat]()(implicit mf: Manifest[F]) = { | |
new JobSpecification[HasConf, HasJC, HasIF, HasM, HasMO, HasC, HasR, HasO, TRUE]( | |
name, | |
configuration, | |
jarClass, | |
inputFormatClass, | |
mapperClass, | |
mapOutputKeyClass, | |
mapOutputValueClass, | |
combinerClass, | |
reducerClass, | |
outputKeyClass, | |
outputValueClass, | |
Some(mf.erasure.asInstanceOf[Class[RawOutputFormat]]) | |
) | |
} | |
} | |
/* | |
Only complete JobSpecifications can be built. The actual build method is | |
private and only available by implicit conversion from JobSpecifications | |
with correct phantom typing | |
- Combiners are optional but need MapOutputs | |
- Mappers, Reducers are optional | |
*/ | |
implicit def combinersNeedMapOutputs(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, TRUE, TRUE, _, TRUE, TRUE]) = new { | |
def build(): Job = buildJob(jobSpecification) | |
} | |
implicit def otherwiseMapOutputsAreOptional(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, FALSE, _, _, TRUE, TRUE]) = new { | |
def build(): Job = buildJob(jobSpecification) | |
} | |
private def buildJob(jobSpecification: JobSpecification[TRUE, TRUE, TRUE, _, _, _, _, TRUE, TRUE]): Job = { | |
val job = new Job(jobSpecification.configuration.get) | |
jobSpecification.jarClass foreach { job setJarByClass _ } | |
job setJobName jobSpecification.name | |
jobSpecification.inputFormatClass foreach { job setInputFormatClass _ } | |
jobSpecification.mapperClass foreach { job setMapperClass _ } | |
jobSpecification.combinerClass foreach { job setCombinerClass _ } | |
jobSpecification.mapOutputKeyClass foreach { job setMapOutputKeyClass _ } | |
jobSpecification.mapOutputValueClass foreach { job setMapOutputValueClass _ } | |
jobSpecification.reducerClass foreach { job setReducerClass _ } | |
jobSpecification.outputKeyClass foreach { job setOutputKeyClass _ } | |
jobSpecification.outputValueClass foreach { job setOutputValueClass _ } | |
jobSpecification.outputFormatClass foreach { job setOutputFormatClass _ } | |
job | |
} | |
def apply(name: String) = builder(name) | |
def builder(name: String) = new JobSpecification[FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE](name) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.hadoop | |
import org.apache.hadoop.io.{ LongWritable, IntWritable, Text } | |
object `package` { | |
implicit def any2Text[A](a: Any) = new { | |
lazy val toText: Text = new Text(a.toString) | |
} | |
implicit object LongWritableNumeric extends Numeric[LongWritable] { | |
val two = fromInt(2) | |
val minusOne = fromInt(-1) | |
def plus(n: LongWritable, m: LongWritable): LongWritable = fromLong(toLong(n) + toLong(m)) | |
def minus(n: LongWritable, m: LongWritable): LongWritable = plus(n, negate(m)) | |
def times(n: LongWritable, m: LongWritable): LongWritable = fromLong(toLong(n) * toLong(m)) | |
def negate(n: LongWritable): LongWritable = times(minusOne, n) | |
def fromInt(n: Int): LongWritable = fromLong(n) | |
def fromLong(n: Long): LongWritable = new LongWritable(n) | |
def toInt(n: LongWritable): Int = toLong(n).toInt | |
def toLong(n: LongWritable): Long = n.get | |
def toFloat(n: LongWritable): Float = toLong(n).toFloat | |
def toDouble(n: LongWritable): Double = toLong(n).toDouble | |
def compare(n: LongWritable, m: LongWritable) = toLong(n) compare toLong(m) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.examples.word_count | |
import com.gu.hadoop._ | |
import org.apache.hadoop.conf.Configured | |
import org.apache.hadoop.io.{ Text, LongWritable } | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.mapreduce._ | |
import org.apache.hadoop.mapreduce.lib.input._ | |
import org.apache.hadoop.mapreduce.lib.output._ | |
import org.apache.hadoop.util.{ Tool, ToolRunner } | |
import scala.collection.JavaConversions._ | |
/* | |
WordCount | |
--------- | |
Input: TextInputFormat = FileInputFormat[LongWritable,Text] | |
Map: (LongWritable, Text) -> (Text, LongWritable) | |
Input rows are mapped to (token, 1) outputs, one per token on the line. | |
Combiner: The reducer is algebraic, so it can be used as a combiner too. | |
Reduce: (Text, LongWritable) -> (Text, LongWritable) | |
Sum count values for each token text. | |
Output: TextOutputFormat[Text, LongWritable] | |
Text output files containing token and count entries. | |
*/ | |
object Main extends App { | |
ToolRunner.run(new Job, args) | |
} | |
class JobMapper extends Mapper[LongWritable, Text, Text, LongWritable] { | |
// The following is type voodoo for the map method signature. | |
type MapperContext = Mapper[LongWritable, Text, Text, LongWritable]#Context | |
override def map(key: LongWritable, value: Text, context: MapperContext) { | |
value.toString split "\\s" foreach { token => | |
context.write(token.toLowerCase.toText, LongWritableNumeric.one) | |
} | |
} | |
} | |
class JobReducer extends Reducer[Text, LongWritable, Text, LongWritable] { | |
// This makes the List[LongWritable] sum work in the reduce method. | |
val n = implicitly[Numeric[LongWritable]] | |
// The following is type voodoo for the reduce method signature. | |
type ReducerContext = Reducer[Text, LongWritable, Text, LongWritable]#Context | |
override def reduce(key: Text, values: java.lang.Iterable[LongWritable], context: ReducerContext) { | |
context.write(key, values.toList.sum) | |
} | |
} | |
class Job extends Configured with Tool { | |
override def run(args: Array[String]): Int = { | |
val params = ApplicationParameters(args) | |
val job = HadoopJob("word-count") | |
.withConf(getConf()) | |
.withJarClass[Job] | |
.withInputFormat[TextInputFormat] | |
.withMapper[JobMapper] | |
.withCombiner[JobReducer] | |
.withReducer[JobReducer] | |
.withOutput[Text, LongWritable] | |
.withOutputFormat[TextOutputFormat[Text, LongWritable]] | |
.build() | |
FileInputFormat.setInputPaths(job, new Path(params(0))) | |
FileOutputFormat.setOutputPath(job, new Path(params(1))) | |
if (job waitForCompletion true) 0 else 1 | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gu.examples | |
import com.gu.hadoop._ | |
import org.apache.hadoop.io._ | |
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator | |
import org.apache.hadoop.mrunit.mapreduce._ | |
import org.scalatest.FlatSpec | |
import org.scalatest.matchers.ShouldMatchers | |
import scala.collection.JavaConversions._ | |
class WordCountTest extends FlatSpec with ShouldMatchers { | |
"WordCount Mapper" should "count tokens" in { | |
(new MapDriver[LongWritable, Text, Text, LongWritable]) | |
.withMapper(new word_count.JobMapper) | |
.withInput(LongWritableNumeric.one, "These are test tokens.".toText) | |
.withOutput("these".toText, LongWritableNumeric.one) | |
.withOutput("are".toText, LongWritableNumeric.one) | |
.withOutput("test".toText, LongWritableNumeric.one) | |
.withOutput("tokens.".toText, LongWritableNumeric.one) | |
.runTest() | |
} | |
it should "count twice for duplicate tokens" in { | |
(new MapDriver[LongWritable, Text, Text, LongWritable]) | |
.withMapper(new word_count.JobMapper) | |
.withInput(LongWritableNumeric.one, "These are test tokens. These are more tokens.".toText) | |
.withOutput("these".toText, LongWritableNumeric.one) | |
.withOutput("are".toText, LongWritableNumeric.one) | |
.withOutput("test".toText, LongWritableNumeric.one) | |
.withOutput("tokens.".toText, LongWritableNumeric.one) | |
.withOutput("these".toText, LongWritableNumeric.one) | |
.withOutput("are".toText, LongWritableNumeric.one) | |
.withOutput("more".toText, LongWritableNumeric.one) | |
.withOutput("tokens.".toText, LongWritableNumeric.one) | |
.runTest() | |
} | |
"WordCount Reducer" should "combine single tokens" in { | |
(new ReduceDriver[Text, LongWritable, Text, LongWritable]) | |
.withReducer(new word_count.JobReducer) | |
.withInput("tokens".toText, List(LongWritableNumeric.one)) | |
.withOutput("tokens".toText, LongWritableNumeric.one) | |
.runTest() | |
} | |
it should "combine duplicate tokens" in { | |
(new ReduceDriver[Text, LongWritable, Text, LongWritable]) | |
.withReducer(new word_count.JobReducer) | |
.withInput("tokens".toText, List(LongWritableNumeric.one, LongWritableNumeric.one)) | |
.withOutput("tokens".toText, LongWritableNumeric.two) | |
.runTest() | |
} | |
"WordCount MapReducer" should "count tokens" in { | |
// Output in key alphabetic order to test | |
val keyOrder = new KeyFieldBasedComparator[Text, LongWritable].asInstanceOf[RawComparator[Text]] | |
(new MapReduceDriver[LongWritable, Text, Text, LongWritable, Text, LongWritable]) | |
.withMapper(new word_count.JobMapper) | |
.withCombiner(new word_count.JobReducer) | |
.withReducer(new word_count.JobReducer) | |
.withKeyOrderComparator(keyOrder) | |
.withInput(LongWritableNumeric.one, "These are test tokens.".toText) | |
.withOutput("are".toText, LongWritableNumeric.one) | |
.withOutput("test".toText, LongWritableNumeric.one) | |
.withOutput("these".toText, LongWritableNumeric.one) | |
.withOutput("tokens.".toText, LongWritableNumeric.one) | |
.runTest() | |
} | |
it should "count duplicate tokens" in { | |
// Output in key alphabetic order to test | |
val keyOrder = new KeyFieldBasedComparator[Text, LongWritable].asInstanceOf[RawComparator[Text]] | |
(new MapReduceDriver[LongWritable, Text, Text, LongWritable, Text, LongWritable]) | |
.withMapper(new word_count.JobMapper) | |
.withCombiner(new word_count.JobReducer) | |
.withReducer(new word_count.JobReducer) | |
.withKeyOrderComparator(keyOrder) | |
.withInput(LongWritableNumeric.one, "These are test tokens. These are more tokens.".toText) | |
.withOutput("are".toText, LongWritableNumeric.two) | |
.withOutput("more".toText, LongWritableNumeric.one) | |
.withOutput("test".toText, LongWritableNumeric.one) | |
.withOutput("these".toText, LongWritableNumeric.two) | |
.withOutput("tokens.".toText, LongWritableNumeric.two) | |
.runTest() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment