Last active
December 31, 2015 04:29
-
-
Save johnynek/7934441 to your computer and use it in GitHub Desktop.
How to do a cross product with more data than can fit in RAM (or with only an iterator on the items), this is one way to do joins on Hadoop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // You can run this file with: scala join.scala | |
| import scala.collection.BufferedIterator | |
| def sortIterator[T:Ordering](it: Iterator[T]): Iterator[T] = | |
| // hadoop handles this, so we just do this for illustration | |
| // here, on disk you want something like: http://en.wikipedia.org/wiki/External_sorting | |
| it.toList.sorted.iterator | |
| // An ordering that sorts Right first | |
| implicit def ordE[T:Ordering, U:Ordering]: Ordering[Either[T,U]] = | |
| new Ordering[Either[T,U]] { | |
| def compare(left: Either[T,U], right: Either[T, U]) = { | |
| (left, right) match { | |
| case (Left(_), Right(_)) => 1 | |
| case (Right(_), Left(_)) => -1 | |
| case (Left(l), Left(r)) => Ordering[T].compare(l, r) | |
| case (Right(l), Right(r)) => Ordering[U].compare(l, r) | |
| } | |
| } | |
| } | |
| /** | |
| * We get one big, let's assume unsorted, iterator, and we need to | |
| * separate out. This implementation uses sorting, which can scale | |
| * because of Hadoop's support for on-disk sorting, but you can | |
| * also imagine a simple in-memory approach that spills the iterator | |
| * into memory and does two filters to get out the Ts and the Us | |
| * see memSplit as to how something like that would work. | |
| */ | |
| def sortSplit[T:Ordering,U:Ordering](merged: Iterator[Either[T,U]]): (Iterator[T], Iterable[U]) = { | |
| @annotation.tailrec | |
| def toPair(sorted: Iterator[Either[T,U]], | |
| acc: List[U] = Nil): (Iterator[T], Iterable[U]) = { | |
| if(!sorted.hasNext) { | |
| (Iterator.empty, acc.reverse) | |
| } | |
| else { | |
| sorted.next match { | |
| case Right(u) => toPair(sorted, u :: acc) | |
| case Left(t) => (Iterator(t) ++ sorted.collect{case Left(t) => t}, acc.reverse) | |
| } | |
| } | |
| } | |
| // do the sort, pull out the right side in a list (on disk probably), | |
| // then apply join | |
| toPair(sortIterator(merged)) | |
| } | |
| /** If the iterator is small, force it in memory to find all the left and right | |
| * elements: | |
| */ | |
| def memSplit[T,U](merged: Iterator[Either[T,U]]): (Iterator[T], Iterable[U]) = { | |
| val iterable = merged.toIterable // forces to a list or stream | |
| (iterable.iterator.collect { case Left(t) => t }, iterable.collect { case Right(u) => u }) | |
| } | |
| // Seems dumb to do this, but if you only have map + sort + reduce, this is what you do | |
| def joinByMerge[U:Ordering,T:Ordering,R](it0: Iterator[T], it1: Iterator[U])( | |
| joinfn: (Iterator[T], Iterable[U]) => Iterator[R]): Iterator[R] = { | |
| val (iter, iterable) = sortSplit(it0.map(Left(_)) ++ it1.map(Right(_))) | |
| // If we know the iterators are both small, this works | |
| //val (iter, iterable) = memSplit(it0.map(Left(_)) ++ it1.map(Right(_))) | |
| joinfn(iter, iterable) | |
| } | |
| val cross = joinByMerge(List(1, 2, 3).iterator, | |
| List("hey", "you", "guys").iterator) { (l, r) => | |
| for { | |
| i <- l | |
| j <- r | |
| } yield (i, j) | |
| } | |
| println(cross.toList) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment