Last active
October 2, 2015 06:05
-
-
Save tristanreid/ec76cc484c7550027f90 to your computer and use it in GitHub Desktop.
Examples of how (and how not) to call Scalding's ExecutionApp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.scalding.typed.TypedPipe | |
import com.twitter.scalding._ | |
import scala.util.{Failure, Success} | |
// If you want to understand what's going on, read the code of Execution and ExecutionApp (it's not long) | |
// Here's the highlights: The main of ExecutionApp - executes job for you, as long as you return Execution[Unit] | |
// Execution: Consider flatMap, zip, unit | |
// unit: since you have to return Execution[Unit] at some point, this is handy. Nice with .zip, for example | |
// zip: combine Executions to execute in parallel for fun and profit | |
// flatMap: start w/ Execution[T], get inside Execution to do some stuff w/ T and return Execution[Whatever] (unit is nice!) | |
object TestExecutionApp extends ExecutionApp{ | |
override def job = Execution.getArgs.flatMap { case args => | |
val outFile = args("output") | |
val pipe = TypedPipe.from(List("A", "B", "C")) | |
pipe.writeExecution(TypedTsv(outFile)) // That's all we need! Returns Execution[Unit] | |
} | |
} | |
object TestExecutionApp2 extends ExecutionApp{ | |
override def job = Execution.getArgs.flatMap { case args => | |
val outFile = args("output") | |
val pipe = TypedPipe.from(List("A", "B", "C")) | |
val wrappedPipe = Execution.from(pipe) | |
wrappedPipe.flatMap { p=> p.writeExecution(TypedTsv(outFile)) } | |
} | |
} | |
object TestExecutionApp2TheWrongWay extends ExecutionApp{ | |
override def job = Execution.getArgs.flatMap { case args => | |
val outFile = args("output") | |
val pipe = TypedPipe.from(List("A", "B", "C")) | |
val wrappedPipe = Execution.from(pipe) // Uh oh! Our pipe is already wrapped, and we want to write it | |
val badCode = wrappedPipe.onComplete { | |
case Success(s) => | |
println("This will get called when your execution gets called") | |
s.writeExecution(TypedTsv("doesntMatterWontGetCalled.tsv")) // This returns a Execution[Unit] that won't get called | |
case Failure(e) => println("oops") | |
} | |
badCode.unit // This Execution[Unit] will successfully get called, but the writeExecution Execution[Unit] didn't get used anywhere | |
} | |
} | |
object TestExecutionApp3 extends ExecutionApp{ | |
override def job = Execution.getArgs.flatMap { case args => | |
val outFile = args("output") | |
val outFile2 = args("output2") | |
val pipe = TypedPipe.from(List("A", "B", "C")) | |
val pipe2 = TypedPipe.from(List(1, 2, 3)) | |
val e1 = pipe.writeExecution(TypedTsv(outFile)) | |
val e2 = pipe2.writeExecution(TypedTsv(outFile2)) | |
e1.zip(e2).unit | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment