Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created September 1, 2012 21:57
Show Gist options
  • Save johnynek/3588731 to your computer and use it in GitHub Desktop.
Save johnynek/3588731 to your computer and use it in GitHub Desktop.
MemoryGroupBuilder
package com.twitter.scalding
import cascading.tuple.Fields
import cascading.tuple.{Tuple => CTuple, TupleEntry}
class MemoryGroupBuilder(val input : List[TupleEntry],
val prevOutput : TupleEntry = new TupleEntry(),
override val sorting : Option[Fields] = None)
extends ReduceOperations[MemoryGroupBuilder] {
private def appendTupleEntry(f : Fields, tup : CTuple) : MemoryGroupBuilder = {
new MemoryGroupBuilder(input,
prevOutput.appendNew(new TupleEntry(f, tup)),
sorting)
}
def foldLeft[X,T](fieldDef : (Fields,Fields))(init : X)(fn : (X,T) => X)
(implicit setter : TupleSetter[X], conv : TupleConverter[T]) : MemoryGroupBuilder = {
val result = setter(input.view
.map { _.selectEntry(fieldDef._1) }
.map { conv(_) } // Now we have List[T]
.foldLeft(init)(fn))
appendTupleEntry(fieldDef._2, result)
}
def mapReduceMap[T,X,U](fieldDef : (Fields, Fields))(mapfn : T => X )(redfn : (X, X) => X)
(mapfn2 : X => U)(implicit startConv : TupleConverter[T],
middleSetter : TupleSetter[X],
middleConv : TupleConverter[X],
endSetter : TupleSetter[U]) : MemoryGroupBuilder = {
val result = endSetter(mapfn2(input.view
.map { _.selectEntry(fieldDef._1) }
.map { startConv(_) } // Now we have List[T]
.map { mapfn(_) }
.reduce { redfn(_,_) }))
appendTupleEntry(fieldDef._2, result)
}
// Perform an inner secondary sort
def sortBy(innerSort : Fields) = {
val newSorting = sorting match {
case None => Some(innerSort)
case Some(sf) => {
sf.append(innerSort)
Some(sf)
}
}
//here is our sorting function:
def teLt(left : TupleEntry, right : TupleEntry) : Boolean = {
newSorting.compare(left.selectTuple(newSorting), right.selectTuple(newSorting)) < 0
}
new MemoryGroupBuilder(input.sortWith { teLt(_,_) }, prevOutput, newSorting)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment