Skip to content

Instantly share code, notes, and snippets.

@alexandru
Created June 15, 2018 05:53
Show Gist options
  • Save alexandru/20d1c43c9ceacbfb9710203bcc4dabc9 to your computer and use it in GitHub Desktop.
Save alexandru/20d1c43c9ceacbfb9710203bcc4dabc9 to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.concurrent.Future
import scala.util.control.NonFatal
import Ack.{Continue, Stop}
sealed trait Ack
object Ack {
case object Continue extends Ack
case object Stop extends Ack
}
trait Observer[-A] {
def onNext(a: A): Future[Ack]
def onComplete(): Unit
def onError(e: Throwable): Unit
}
final class Cancelable(f: () => Unit)
extends (() => Unit) {
private[this] val thunk = new AtomicReference(f)
def apply(): Unit = {
val ref = thunk.getAndSet(null)
if (ref != null) ref()
}
}
trait Observable[+A] { self =>
/**
* Characteristic function
*/
def subscribe(o: Observer[A]): Cancelable
final def map[B](f: A => B): Observable[B] =
(out: Observer[B]) => self.subscribe(new Observer[A] {
def onNext(a: A): Future[Ack] =
out.onNext(f(a))
def onComplete(): Unit =
out.onComplete()
def onError(e: Throwable): Unit =
out.onError(e)
})
final def filter(p: A => Boolean): Observable[A] =
(out: Observer[A]) => self.subscribe(new Observer[A] {
def onNext(a: A): Future[Ack] =
if (p(a)) out.onNext(a)
else Future.successful(Continue)
def onComplete(): Unit =
out.onComplete()
def onError(e: Throwable): Unit =
out.onError(e)
})
}
object Observable {
/**
* Converts any sequence into an Observable.
*/
def fromSeq[A](seq: Iterable[A]): Observable[A] =
(out: Observer[A]) => {
def loop(cursor: Iterator[A], wasCanceled: AtomicBoolean): Future[Unit] = {
var streamError = true
if (wasCanceled.get()) Future.unit else
try {
if (cursor.hasNext)
out.onNext(cursor.next()).flatMap {
case Continue => loop(cursor, wasCanceled)
case Stop => Future.unit
}
else {
streamError = false
out.onComplete()
Future.unit
}
} catch {
case NonFatal(e) if streamError =>
out.onError(e)
Future.unit
}
}
val wasCanceled = new AtomicBoolean()
loop(seq.iterator, wasCanceled)
new Cancelable(() => wasCanceled.set(false))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment