Created
September 12, 2012 08:47
-
-
Save robinp/3705334 to your computer and use it in GitHub Desktop.
scala running merge of streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object merging { | |
// Note: Tail-recursive stream functions should always be defined on objects, not traits. | |
// see http://stackoverflow.com/questions/12486762/scala-tail-recursive-stream-processor-function-defined-in-trait-holds-reference | |
/** | |
* Merges two ordered streams. The elements can be of different | |
* types, but they need a mapping to a common type C. | |
* | |
* The mapped streams must be ordered ascending. | |
* | |
* The mapped streams should contain distinct values. | |
* | |
* @param as first stream to merge | |
* @param bs second stream to merge | |
* @param f maps elements of the first stream to an ordering key | |
* @param g maps elements of the second stream to an ordering key | |
* @return the merged stream where pairs constitue elements with equal key. | |
* unpaired elements are dropped. | |
*/ | |
@tailrec | |
final def merge[A, B, C](as: Stream[A], bs: Stream[B])(f: A => C, g: B => C)( | |
implicit C: Ordering[C]): Stream[(A, B)] = | |
(as, bs) match { | |
case (a #:: atails, b #:: btails) => | |
val keyA = f(a) | |
val keyB = g(b) | |
if (C.equiv(keyA, keyB)) { | |
val atailRef = new AtomicReference(atails) | |
val btailRef = new AtomicReference(btails) | |
// tail calls end here, return a result | |
(a, b) #:: mergeLazy(atailRef, btailRef)(f, g) | |
} | |
else if (C.lt(keyA, keyB)) merge(atails, bs)(f, g) | |
else merge(as, btails)(f, g) | |
case _ /** Some of the streams is empty */ => Stream.empty | |
} | |
/** | |
* Needed to make the compiler believe the tailcallness of merge. | |
* Care is taken to avoid holding a reference to the stream heads (see inline comments). | |
*/ | |
final def mergeLazy[A, B, C](asRef: AtomicReference[Stream[A]], bsRef: AtomicReference[Stream[B]])(f: A => C, g: B => C)( | |
implicit C: Ordering[C]) = { | |
// Passes the stream head references to the merge function and immediately deletes then from the holder, | |
// so that the mergeLazy stack frame doesn't keep them referred and they can be GC'd while the call is running. | |
// | |
// Note: don't put the unwrapped streams into local variables, obviously. | |
merge(asRef.getAndSet(null), bsRef.getAndSet(null))(f, g) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment