Last active
April 11, 2021 21:07
-
-
Save petitviolet/7de478c406b568b738471dd324ad80e5 to your computer and use it in GitHub Desktop.
Akka-Stream with Actor using ActorPublisher and ActorSubscriber
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.petitviolet.ex.persistence.task | |
import akka.NotUsed | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnNext } | |
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, OneByOneRequestStrategy, RequestStrategy } | |
import akka.stream.{ ActorMaterializer, ClosedShape } | |
import akka.util.Timeout | |
import org.reactivestreams.Publisher | |
import scala.concurrent.Future | |
import scala.io.StdIn | |
private case class Letter(value: String) extends AnyVal | |
private case object Finish | |
private object AkkaStreamPracWithActor extends App { | |
import akka.stream.scaladsl._ | |
implicit val system = ActorSystem("akka-stream-prac") | |
implicit val executor = system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
class PublishActor extends ActorPublisher[Letter] { | |
// publish [[Letter]] or OnComplete | |
override def receive: Actor.Receive = { | |
case s: String => | |
onNext(Letter(s"Nice: $s")) | |
case i: Int => | |
onNext(Letter(s"Great: ${i * 100}")) | |
case Finish => | |
onComplete() | |
} | |
} | |
class FlowActor extends Actor { | |
// subscribe and publish | |
override def receive: Actor.Receive = { | |
case Letter(msg) => sender() ! Letter(s"(Mapped: $msg)") | |
case any => println(s"??? => $any") | |
} | |
} | |
class SubscribeActor extends ActorSubscriber { | |
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy | |
// just subscribe | |
override def receive: Actor.Receive = { | |
case OnNext(any) => println(s"subscribed: $any") | |
case OnComplete => println(s"finish process!") | |
} | |
} | |
// publisher actor | |
val actorRef = system.actorOf(Props[PublishActor]) | |
// source with actor | |
val source: Source[Letter, NotUsed] = { | |
val publisher: Publisher[Letter] = ActorPublisher(actorRef) | |
Source.fromPublisher(publisher) | |
} | |
// flow | |
val flow: Flow[Letter, Letter, NotUsed] = { | |
import scala.concurrent.duration._ | |
implicit val timeout: Timeout = 1.second | |
val flowActor = system.actorOf(Props[FlowActor]) | |
def flowWithActor(reply: Letter): Future[Letter] = (flowActor ? reply).mapTo[Letter] | |
Flow[Letter].mapAsync[Letter](3)(flowWithActor) | |
} | |
// simple implementation without actor | |
val _flow: Flow[Letter, Letter, NotUsed] = Flow[Letter].map { r => r.copy(value = s"(Mapped: ${r.value})") } | |
// another flow without actor | |
val accumulater: Flow[Letter, String, NotUsed] = | |
Flow[Letter].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" } | |
// sink with actor | |
val sink: Sink[String, NotUsed] = { | |
val printActor = system.actorOf(Props[SubscribeActor]) | |
Sink.fromSubscriber[String](ActorSubscriber[String](printActor)) | |
} | |
// simple graph | |
val _graph: RunnableGraph[NotUsed] = | |
RunnableGraph.fromGraph(source via flow via accumulater to sink) | |
// written by DSL | |
val graph: RunnableGraph[NotUsed] = RunnableGraph.fromGraph { | |
GraphDSL.create() { implicit builder => | |
import GraphDSL.Implicits._ | |
source ~> flow ~> accumulater ~> sink | |
ClosedShape | |
} | |
} | |
graph.run | |
// wait preparing graph | |
Thread.sleep(100L) | |
actorRef ! "hello!" | |
actorRef ! 100 | |
actorRef ! "good" | |
actorRef ! Finish | |
println("push Enter to shutdown process.") | |
StdIn.readLine() | |
system.terminate() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.petitviolet.ex.persistence.task | |
import akka.actor._ | |
import akka.stream.{ ActorMaterializer, OverflowStrategy } | |
import akka.{ Done, NotUsed } | |
import scala.concurrent.Future | |
import scala.io.StdIn | |
import scala.language.postfixOps | |
private case class Message(value: String) extends AnyVal | |
private object AkkaStreamStandard extends App { | |
import akka.stream.scaladsl._ | |
implicit val system = ActorSystem("akka-stream-prac") | |
implicit val executor = system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
// source with actor | |
val source: Source[Message, SourceQueueWithComplete[Message]] = | |
Source.queue[Message](100, OverflowStrategy.backpressure) | |
// flow | |
val flow: Flow[Message, Message, NotUsed] = | |
Flow[Message].map { r => r.copy(value = s"(Mapped: ${r.value})") } | |
// another flow | |
val accumulater: Flow[Message, String, NotUsed] = | |
Flow[Message].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" } | |
// sink just printing message | |
val sink: Sink[String, Future[Done]] = Sink.foreach[String] { println } | |
// simple graph | |
val graph: RunnableGraph[SourceQueueWithComplete[Message]] = | |
source via flow via accumulater to sink | |
// queue for publisher of graph | |
val queue: SourceQueueWithComplete[Message] = graph.run() | |
// wait preparing graph | |
Thread.sleep(100L) | |
queue offer Message("hello!") | |
queue offer Message("100") | |
queue offer Message("good") | |
queue complete | |
println("push Enter to shutdown process.") | |
StdIn.readLine() | |
system.terminate() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Great Example
thank you!