Created
February 6, 2017 07:50
-
-
Save NicolaeNMV/0f556315ae3e00d9031d04b4efd694fc to your computer and use it in GitHub Desktop.
Playing with reactive streams and ammonite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env amm | |
import $ivy.`com.typesafe.akka::akka-stream:2.4.16` | |
import java.nio.file.Paths | |
import akka.NotUsed | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.actor.ActorSystem | |
import akka.util.ByteString | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration._ | |
implicit val system = ActorSystem("QuickStart") | |
implicit val materializer = ActorMaterializer() | |
final case class Author(handle: String) | |
final case class Hashtag(name: String) | |
final case class Tweet(author: Author, timestamp: Long, body: String) { | |
def hashtags: Set[Hashtag] = | |
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet | |
} | |
val tweet = Tweet(Author("auth"), timestamp = 123, body = "#blah #lol") | |
val tweets = Source.repeat(tweet).take(10) | |
val sinkPrintingOutElements = Sink.foreach[String](println(_)) | |
val writeAuthors: Sink[Author, NotUsed] = | |
Flow[Author].map(_.handle).toMat(sinkPrintingOutElements)(Keep.left) | |
RunnableGraph.fromGraph(GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val bcast = b.add(Broadcast[Tweet](2)) | |
tweets ~> bcast.in | |
bcast.out(0) ~> Flow[Tweet].map(_.author.handle.toString) ~> sinkPrintingOutElements | |
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList).map(_.toString) ~> sinkPrintingOutElements | |
ClosedShape | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment