Skip to content

Instantly share code, notes, and snippets.

@mgodave
Last active June 8, 2016 23:22
Show Gist options
  • Save mgodave/e545b332bcd327fb585837af36229cbd to your computer and use it in GitHub Desktop.
Save mgodave/e545b332bcd327fb585837af36229cbd to your computer and use it in GitHub Desktop.
object AsyncStreams {
def collect[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
AsyncStream.fromFuture(
Future.collect(
streams.map(_.head)
).map(_.flatten)
).flatMap(AsyncStream.fromSeq) ++ collect(streams)
}
}
import com.twitter.concurrent.AsyncStream
import com.twitter.util.{Return, Try, Future}
object AsyncStreams {
def merge2[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
AsyncStream.fromFuture(
Future.collect(streams.map(_.uncons)).map { a =>
a.foldLeft((Seq[E](), Seq[AsyncStream[E]]())) {
case ((heads, tails), Some((head, tail))) =>
(head +: heads, tail() +: tails)
case (headsandtails, None) => headsandtails
}
}
).flatMap {
case (heads, tails) =>
AsyncStream.fromSeq(heads) ++ merge2(tails)
}
}
def merge[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
def inner(things: Seq[Future[Option[(E, () => AsyncStream[E])]]]): AsyncStream[E] = {
AsyncStream.fromFuture(Future.select(things)).flatMap {
case (Return(Some((head, tail))), tails) =>
AsyncStream(head) ++ inner(tail().uncons +: tails)
case (_, tails) => inner(tails)
}
}
inner(streams.map(_.uncons))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment