Skip to content

Instantly share code, notes, and snippets.

@johnynek
Last active December 31, 2015 04:29
Show Gist options
  • Select an option

  • Save johnynek/7934441 to your computer and use it in GitHub Desktop.

Select an option

Save johnynek/7934441 to your computer and use it in GitHub Desktop.
How to do a cross product with more data than can fit in RAM (or with only an iterator on the items), this is one way to do joins on Hadoop.
// You can run this file with: scala join.scala
import scala.collection.BufferedIterator
def sortIterator[T:Ordering](it: Iterator[T]): Iterator[T] =
// hadoop handles this, so we just do this for illustration
// here, on disk you want something like: http://en.wikipedia.org/wiki/External_sorting
it.toList.sorted.iterator
// An ordering that sorts Right first
implicit def ordE[T:Ordering, U:Ordering]: Ordering[Either[T,U]] =
new Ordering[Either[T,U]] {
def compare(left: Either[T,U], right: Either[T, U]) = {
(left, right) match {
case (Left(_), Right(_)) => 1
case (Right(_), Left(_)) => -1
case (Left(l), Left(r)) => Ordering[T].compare(l, r)
case (Right(l), Right(r)) => Ordering[U].compare(l, r)
}
}
}
/**
* We get one big, let's assume unsorted, iterator, and we need to
* separate out. This implementation uses sorting, which can scale
* because of Hadoop's support for on-disk sorting, but you can
* also imagine a simple in-memory approach that spills the iterator
* into memory and does two filters to get out the Ts and the Us
* see memSplit as to how something like that would work.
*/
def sortSplit[T:Ordering,U:Ordering](merged: Iterator[Either[T,U]]): (Iterator[T], Iterable[U]) = {
@annotation.tailrec
def toPair(sorted: Iterator[Either[T,U]],
acc: List[U] = Nil): (Iterator[T], Iterable[U]) = {
if(!sorted.hasNext) {
(Iterator.empty, acc.reverse)
}
else {
sorted.next match {
case Right(u) => toPair(sorted, u :: acc)
case Left(t) => (Iterator(t) ++ sorted.collect{case Left(t) => t}, acc.reverse)
}
}
}
// do the sort, pull out the right side in a list (on disk probably),
// then apply join
toPair(sortIterator(merged))
}
/** If the iterator is small, force it in memory to find all the left and right
* elements:
*/
def memSplit[T,U](merged: Iterator[Either[T,U]]): (Iterator[T], Iterable[U]) = {
val iterable = merged.toIterable // forces to a list or stream
(iterable.iterator.collect { case Left(t) => t }, iterable.collect { case Right(u) => u })
}
// Seems dumb to do this, but if you only have map + sort + reduce, this is what you do
def joinByMerge[U:Ordering,T:Ordering,R](it0: Iterator[T], it1: Iterator[U])(
joinfn: (Iterator[T], Iterable[U]) => Iterator[R]): Iterator[R] = {
val (iter, iterable) = sortSplit(it0.map(Left(_)) ++ it1.map(Right(_)))
// If we know the iterators are both small, this works
//val (iter, iterable) = memSplit(it0.map(Left(_)) ++ it1.map(Right(_)))
joinfn(iter, iterable)
}
val cross = joinByMerge(List(1, 2, 3).iterator,
List("hey", "you", "guys").iterator) { (l, r) =>
for {
i <- l
j <- r
} yield (i, j)
}
println(cross.toList)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment