Created
November 7, 2016 21:19
-
-
Save johnynek/7da4dc4b273bb7781e8ff659164347d3 to your computer and use it in GitHub Desktop.
Doing basic set operations in scalding
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // how do do set operations in scalding: | |
| case class DSet[T](toPipe: TypedPipe[T]) { | |
| // returns the items in that that are in toPipe (not-de-duplicated | |
| def contains(that: TypedPipe[T])(implicit ord: Ordering[T]): TypedPipe[T] = | |
| toPipe.asKeys.sum // reduce all the values to a single value on the set | |
| .join(that.asKeys.size) | |
| .flatMap { case (t, (_, sz)) => Iterator.fill(sz)(t) } | |
| // which items in that are not in toPipe, the union | |
| // of contains and notContains is the original that. | |
| def notContains(that: TypedPipe[T])(implicit ord: Ordering[T]): TypedPipe[T] = | |
| toPipe.asKeys.sum | |
| .rightJoin(that.asKeys.size) | |
| .flatMap { | |
| case (t, (None, Some(sz))) => Iterator.fill(sz)(t) | |
| case (_, (Some(_), _)) => Iterator.empty // exists in toPipep | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment