Last active
March 12, 2017 06:48
-
-
Save quelgar/0e4747ccb05a531d1b5f05c341c1cc66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.reactive._ | |
import monix.execution._ | |
import Ack.{Stop, Continue} | |
import scala.util.control.NonFatal | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
def feed[A](in: Iterator[A], out: Observer[A]) | |
(implicit s: Scheduler): Future[Ack] = { | |
// Indicates whether errors that happen inside the | |
// logic below should be streamed downstream with | |
// onError, or whether we aren't allowed because of | |
// the grammar. Basically we need to differentiate | |
// between errors triggered by our data source, the | |
// Iterator, and errors triggered by our Observer, | |
// which isn't allowed to triggered exceptions. | |
var streamErrors = true | |
try { | |
// Iterator protocol, we need to ask if it hasNext | |
if (!in.hasNext) { | |
// From this point on, we aren't allowed to call onError | |
// because it can break the contract | |
streamErrors = false | |
// Signaling the end of the stream, then we are done | |
out.onComplete() | |
Thread.dumpStack() | |
Stop | |
} else { | |
// Iterator protocol, we get a next element | |
val next = in.next() | |
// From this point on, we aren't allowed to call onError | |
// because it can break the contract | |
streamErrors = false | |
// Signaling onNext, then back-pressuring | |
out.onNext(next).flatMap { | |
case Continue => | |
// We got permission, go next | |
feed(in, out)(s) | |
case Stop => | |
// Nothing else to do, stop the loop | |
Stop | |
} | |
} | |
} catch { | |
case NonFatal(ex) => | |
// The Iterator triggered the error, so stream it | |
if (streamErrors) | |
out.onError(ex) | |
else // The Observer triggered the error, so log it | |
s.reportFailure(ex) | |
// Nothing else to do | |
Stop | |
} | |
} | |
import monix.execution.Scheduler.Implicits.global | |
// stack overflow: (fixed in 2.2.3, see https://github.com/monix/monix/issues/330) | |
Await.result(feed(Iterator.range(1, 10000), Observer.empty), 2.minutes) | |
// works: | |
//Await.result(Observer.feed(Observer.empty, 1 to 10000), 2.minutes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment