Skip to content

Instantly share code, notes, and snippets.

@sscdotopen
Created November 12, 2013 15:10
Show Gist options
  • Save sscdotopen/7432435 to your computer and use it in GitHub Desktop.
Save sscdotopen/7432435 to your computer and use it in GitHub Desktop.
package eu.stratosphere.scala.examples.wordcount
import eu.stratosphere.scala.{ScalaPlan, TextFile}
import eu.stratosphere.pact.common.plan.PlanAssembler
import eu.stratosphere.scala._
import eu.stratosphere.scala.operators._
case class Author(id: Int, name: String)
case class Book(authorId: Int, year: Int, title: String)
class BookAuthorJoin extends PlanAssembler with Serializable {
override def getPlan(args: String*) = {
getScalaPlan(args(0), args(1), args(2), args(3).toInt)
}
def getScalaPlan(booksFile: String, authorsFile: String, output: String, numSubTasks: Int) = {
val authors = DataSource(authorsFile, CsvInputFormat[Author]())
val books = DataSource(booksFile, CsvInputFormat[Book]())
val booksByAuthor = authors join books where { _.id } isEqualTo { _.authorId }
map { case (author, book) => (author.name, book.title, book.year) }
val output = booksByAuthor.write(output, CsvOutputFormat[(String, String, Int)]())
ScalaPlan(Seq(output), "Book-Author-Join", numSubTasks)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment