Created
June 12, 2013 16:19
-
-
Save schmmd/5766839 to your computer and use it in GitHub Desktop.
Scoobi multi-stage MR bug with lzo compression.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import AssemblyKeys._ // put this at the top of the file | |
assemblySettings | |
scalaVersion := "2.10.1" | |
libraryDependencies += "com.nicta" %% "scoobi" % "0.7.0-RC2-cdh3" | |
libraryDependencies += "org.apache.hadoop" % "hadoop-lzo" % "0.4.13" | |
resolvers ++= Seq("nicta" at "http://nicta.github.com/scoobi/releases", | |
"cloudera" at "https://repository.cloudera.com/content/repositories/releases", | |
"Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/") | |
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => | |
{ | |
case x => { | |
val oldstrat = old(x) | |
if (oldstrat == MergeStrategy.deduplicate) MergeStrategy.first | |
else oldstrat | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.nicta.scoobi.examples | |
import com.nicta.scoobi.Scoobi._ | |
import com.nicta.scoobi.io.text._ | |
import com.hadoop.mapreduce.LzoTextInputFormat | |
object Job extends ScoobiApp { | |
def run() { | |
val lines: DList[String] = TextInput.fromTextSource( | |
new TextSource(Seq(args(0)), | |
inputFormat = classOf[LzoTextInputFormat].asInstanceOf[Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]])) | |
val mapped = lines.map { line => | |
val Array(first, second) = line.split("\t") | |
(first, second) | |
} | |
val grouped = mapped.groupBy(_._1).map { case (key, values) => | |
(key, values.size) | |
} | |
val grouped2 = grouped.groupBy(_._2).map { case (key, values) => | |
(Iterable(key) ++ values).mkString("\t") | |
} | |
persist(grouped2.toTextFile(args(1), overwrite=true)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment