Skip to content

Instantly share code, notes, and snippets.

@alexy
Created January 12, 2012 21:28
Show Gist options
  • Save alexy/1603242 to your computer and use it in GitHub Desktop.
Save alexy/1603242 to your computer and use it in GitHub Desktop.
Sequence File Output for key Text and value Text
trait OutputConverter[K, V, S] {
def toKeyValue(s: S): (K, V)
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
trait DataSink[K, V, B] {
def outputTypeName: String
def outputPath: Path
def outputFormat: Class[_ <: FileOutputFormat[K,V]]
def converter: OutputConverter[K, V, B]
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// SequenceFileOutput.scala
def toSequenceFile2[K : WireFormat, V : WireFormat, WtK <: Writable, WtV <: Writable]
(dl: DList[Tuple2[K, V]], path: String)
(putK: K => WtK, putV: V => WtV)
: DListPersister[(K, V)] = {
val persister = new Persister[(K, V)] {
def mkOutputStore(node: AST.Node[(K, V)]) = new OutputStore(node) {
def outputTypeName = typeName
def outputPath = new Path(path)
def outputFormat = classOf[SequenceFileOutputFormat[WtK, WtV]]
def converter = new OutputConverter[WtK, WtV, (K, V)] {
def toKeyValue(x: (K, V)) = (putK(x._1), putV(x._2))
}
}
}
new DListPersister(dl, persister)
}
// usage:
DList.persist(SequenceFileOutput.toSequenceFile2(scores, outputFile)
(new Text(_), new Text(_)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment