Skip to content

Instantly share code, notes, and snippets.

@danellis
Created November 28, 2017 03:42
Show Gist options
  • Save danellis/14dfffba3e7d4563c1aede17af0cc5f5 to your computer and use it in GitHub Desktop.
Save danellis/14dfffba3e7d4563c1aede17af0cc5f5 to your computer and use it in GitHub Desktop.
Streams
@ val sources = List(Source(1 :: 2 :: 3 :: Nil), Source(4 :: 5 :: 6 :: Nil), Source(7 :: 8 :: 9 :: Nil))
sources: List[Source[Int, akka.NotUsed]] = List(
Source(SourceShape(StatefulMapConcat.out(666586581))),
Source(SourceShape(StatefulMapConcat.out(1638756960))),
Source(SourceShape(StatefulMapConcat.out(136416115)))
)
@ val futures = sources.map(_.runWith(Sink.seq))
futures: List[concurrent.Future[collection.immutable.Seq[Int]]] = List(
Future(Success(Vector(1, 2, 3))),
Future(Success(Vector(4, 5, 6))),
Future(Success(Vector(7, 8, 9)))
)
@ val seqSources = futures.map(Source.fromFuture)
seqSources: List[Source[collection.immutable.Seq[Int], akka.NotUsed]] = List(
Source(SourceShape(FutureSource.out(882120575))),
Source(SourceShape(FutureSource.out(309448115))),
Source(SourceShape(FutureSource.out(520681560)))
)
@ val source = seqSources.reduceLeft(_ concat _)
source: Source[collection.immutable.Seq[Int], akka.NotUsed] = Source(SourceShape(Concat.out(965190402) mapped to Concat.out(1911531884) mapped to Concat.out(1956760789)))
@ source.runWith(Sink.foreach(println))
Vector(1, 2, 3)
Vector(4, 5, 6)
Vector(7, 8, 9)
res22: concurrent.Future[akka.Done] = Future(Success(Done))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment