Skip to content

Instantly share code, notes, and snippets.

@remeniuk
Created August 10, 2012 11:51
Show Gist options
  • Save remeniuk/3313754 to your computer and use it in GitHub Desktop.
Save remeniuk/3313754 to your computer and use it in GitHub Desktop.
class Indexer(args: Args) extends Job(args) {
val output = WritableSequenceFile(args("output"), classOf[Text], classOf[IntWritable],
'userId -> 'idx)
TextLine(args("input")).read
.map(('offset -> 'line) -> ('userId -> 'idx)) {
// dictionary lines are read with indices from TextLine source
// out of the box. For some reason, in my case, indices were multiplied by 5, so I have had to divide them
tuple: (Int, String) => (new Text(tuple._2.toString) -> new IntWritable((tuple._1 / 5)))
}
.project(('userId -> 'idx)) // only userId -> index tuple is passed to the output
.write(output)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment