Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created November 7, 2016 21:19
Show Gist options
  • Select an option

  • Save johnynek/7da4dc4b273bb7781e8ff659164347d3 to your computer and use it in GitHub Desktop.

Select an option

Save johnynek/7da4dc4b273bb7781e8ff659164347d3 to your computer and use it in GitHub Desktop.
Doing basic set operations in scalding
// how do do set operations in scalding:
case class DSet[T](toPipe: TypedPipe[T]) {
// returns the items in that that are in toPipe (not-de-duplicated
def contains(that: TypedPipe[T])(implicit ord: Ordering[T]): TypedPipe[T] =
toPipe.asKeys.sum // reduce all the values to a single value on the set
.join(that.asKeys.size)
.flatMap { case (t, (_, sz)) => Iterator.fill(sz)(t) }
// which items in that are not in toPipe, the union
// of contains and notContains is the original that.
def notContains(that: TypedPipe[T])(implicit ord: Ordering[T]): TypedPipe[T] =
toPipe.asKeys.sum
.rightJoin(that.asKeys.size)
.flatMap {
case (t, (None, Some(sz))) => Iterator.fill(sz)(t)
case (_, (Some(_), _)) => Iterator.empty // exists in toPipep
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment