Skip to content

Instantly share code, notes, and snippets.

@searler
Created June 4, 2015 21:45
Show Gist options
  • Save searler/8b6db30d443be44b0e79 to your computer and use it in GitHub Desktop.
Save searler/8b6db30d443be44b0e79 to your computer and use it in GitHub Desktop.
Attempt to redirect Akka reactive stream Flow via actors
package actors
import scala.collection.immutable.Seq
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.OneByOneRequestStrategy
import akka.stream.actor.ActorSubscriberMessage._
object CentralFlowed extends App {
case class SourceRef(source: ActorRef)
case object Bye
class Published(master: ActorRef) extends ActorPublisher[Any] with ActorLogging {
override def preStart() = {
super.preStart()
master ! SourceRef(self)
}
var current: Any = null
def receive = {
case Request(_) =>
log.info(s"request $current $isActive $isCompleted")
if (isActive && current != null) {
onNext(current)
current = null
}
case Cancel =>
log.info("cancel!!!!!!!!!!!!!!!!!!!!!!!!!!")
context.stop(self)
case Bye =>
log.info("bye")
// onComplete()
case x @ _ =>
log.info(s"set $x")
if (totalDemand > 0 && isActive)
onNext(x)
else
current = x
}
}
class Subscribed(master: ActorRef) extends ActorSubscriber with ActorLogging {
def receive = {
case OnNext(x) => master ! x
case x @ (OnComplete | OnError(_)) =>
log.info(s"stopped $x")
master ! OnComplete
context.stop(self)
}
override val requestStrategy = OneByOneRequestStrategy
}
class Master extends Actor with ActorLogging {
var sources: Seq[ActorRef] = List()
def receive = {
case OnComplete =>
log.info(s"complete $sources")
sources foreach { _ ! Bye }
case SourceRef(ref) =>
sources = sources :+ ref
log.info(s"source $sources")
case x @ _ =>
log.info(s"passed $x to $sources")
sources foreach { _ ! x }
}
}
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val master: ActorRef = system.actorOf(Props[Master])
val f = Flow.wrap(Sink.actorSubscriber(Props(new Subscribed(master))),
Source.actorPublisher(Props(new Published(master))))((_, _) => ())
f.runWith(Source.apply(Seq("A", "B")), Sink.foreach { println })
._2
.onComplete {
case scala.util.Failure(f) =>
println(f)
system.shutdown
case x @ _ =>
println("shutdown")
system.shutdown
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment