Skip to content

Instantly share code, notes, and snippets.

@Antwnis
Created August 19, 2014 10:40
Show Gist options
  • Save Antwnis/4953c1effc38800e9d0b to your computer and use it in GitHub Desktop.
Save Antwnis/4953c1effc38800e9d0b to your computer and use it in GitHub Desktop.
Hashing all Fields in Scalding
import cascading.pipe.Pipe
import cascading.tuple.{TupleEntry, Fields}
import com.twitter.scalding._
// One trait + one object = Custom Operations
trait HashOperations extends FieldConversions {
def self: RichPipe
def generateHash : Pipe = self
.map(Fields.ALL -> 'hash) { te : TupleEntry =>
val tuple = te.getTuple
println("->" + tuple + "<-")
tuple.hashCode()
}
}
object HashOperationsWrapper {
implicit def wrapPipe(self: cascading.pipe.Pipe): HashOperationsWrapper = new HashOperationsWrapper(new RichPipe(self))
implicit class HashOperationsWrapper(val self: RichPipe) extends HashOperations with Serializable
}
class GenerateDeltaData(args: Args) extends Job(args) {
// Import our custom - reusable operation
import HashOperationsWrapper._
val SCHEMA = List('hbasekey, 'column1, 'column2, 'column3)
val updateList = scala.util.Random.shuffle((1 to 50).toList)
val updatePipe = IterableSource[(Int)](updateList, ('number))
.mapTo(SCHEMA) { x: Int =>
val newHBaseKey = "SYSTEM-YYYYmmDD-hash" + "%07d".format(x)
(newHBaseKey, "COL1", "COL2", "2014-08-01 10:00:00.000")
}
// Use re-usable operation
.generateHash
.debug
.write(Tsv("dummy-output"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment