Last active
December 18, 2015 23:09
-
-
Save johnynek/5859909 to your computer and use it in GitHub Desktop.
A simple API using Future and Promise to create an async stream.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// could easily be ported to Scala Future | |
import com.twitter.util.{Promise, Future} | |
trait Source[+T] { self => | |
def head: Option[T] | |
def tail: Future[Source[T]] | |
def map[U](fn: T => U): Source[U] = new Source[U] { | |
def head = self.head.map(fn) | |
def tail = self.tail.map { _.map(fn) } | |
} | |
def concat[U>:T](that: Future[Source[U]]): Future[Source[U]] = | |
if(head.isEmpty) | |
that | |
else Future.value(new Source[U] { | |
def head = self.head | |
def tail = self.tail.flatMap { _.concat(that) } | |
}) | |
def foldLeft[U](init: Future[U])(fn: (U,T) => Future[U]): Future[U] = | |
if(head.isEmpty) init | |
else for { | |
u <- init | |
tailS <- tail | |
res <- tailS.foldLeft(fn(u,head.get))(fn) | |
} yield res | |
// The monad is on Future[Source[T]] | |
//def flatMap[U](fn: T => Future[Source[T]]): Future[Source[T]] | |
// TODO: add methods in terms of the above here: | |
// take(n: Int): Source[T], takeWhile, drop, scan | |
// cue the monad discussion that we really just want a Stream[M[_], T] with some Monad[M] | |
} | |
object Source { | |
def empty: Source[Nothing] = new Source[Nothing] { | |
def head = None | |
def tail = Future.exception(new Exception("Empty")) | |
} | |
def fromStream[T](s: Stream[T]): Source[T] = new Source[T] { | |
def head = if(s.isEmpty) None else Some(s.head) | |
def tail = Future.value(fromStream(s.tail)) | |
} | |
} | |
trait Sink[T] { | |
def source: Future[Source[T]] | |
// only put once, returns the next Sink to put into | |
def put(t: T): Sink[T] | |
def finish: Unit | |
} | |
object Sink { | |
def empty[T]: Sink[T] = consumer(new Promise[(Option[T], Future[Source[T]])]()) | |
private def consumer[T](promise: Promise[(Option[T], Future[Source[T]])]): Sink[T] = new Sink[T] { | |
def source = promise.map { ht => | |
new Source[T] { | |
def head = ht._1 | |
def tail = ht._2 | |
} | |
} | |
def put(t: T): Sink[T] = { | |
val nextSink = empty[T] | |
promise.setValue((Some(t), nextSink.source)) | |
nextSink | |
} | |
def finish: Unit = { | |
promise.setValue(None, empty[T].source) | |
} | |
} | |
} | |
/** Now play with it: | |
scala> Sink.empty[Int] | |
res10: Sink[Int] = Sink$$anon$1@2d51f840 | |
scala> res10.put(10) | |
res11: Sink[Int] = Sink$$anon$1@6a9bbaed | |
scala> res11.put(11) | |
res12: Sink[Int] = Sink$$anon$1@26efec43 | |
scala> res12.put(12) | |
res13: Sink[Int] = Sink$$anon$1@3f44c5bd | |
scala> res10.tail.get.tail.get.head.get ^ | |
res14: Int = 12 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment