Created
June 4, 2015 21:45
-
-
Save searler/8b6db30d443be44b0e79 to your computer and use it in GitHub Desktop.
Attempt to redirect Akka reactive stream Flow via actors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import scala.collection.immutable.Seq | |
import akka.actor.Actor | |
import akka.actor.ActorLogging | |
import akka.actor.ActorRef | |
import akka.actor.ActorSystem | |
import akka.actor.Props | |
import akka.actor.actorRef2Scala | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.actor.ActorPublisher | |
import akka.stream.actor.ActorPublisherMessage.Cancel | |
import akka.stream.actor.ActorPublisherMessage.Request | |
import akka.stream.scaladsl.Flow | |
import akka.stream.scaladsl.Sink | |
import akka.stream.scaladsl.Source | |
import akka.stream.actor.ActorSubscriber | |
import akka.stream.actor.OneByOneRequestStrategy | |
import akka.stream.actor.ActorSubscriberMessage._ | |
object CentralFlowed extends App { | |
case class SourceRef(source: ActorRef) | |
case object Bye | |
class Published(master: ActorRef) extends ActorPublisher[Any] with ActorLogging { | |
override def preStart() = { | |
super.preStart() | |
master ! SourceRef(self) | |
} | |
var current: Any = null | |
def receive = { | |
case Request(_) => | |
log.info(s"request $current $isActive $isCompleted") | |
if (isActive && current != null) { | |
onNext(current) | |
current = null | |
} | |
case Cancel => | |
log.info("cancel!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
context.stop(self) | |
case Bye => | |
log.info("bye") | |
// onComplete() | |
case x @ _ => | |
log.info(s"set $x") | |
if (totalDemand > 0 && isActive) | |
onNext(x) | |
else | |
current = x | |
} | |
} | |
class Subscribed(master: ActorRef) extends ActorSubscriber with ActorLogging { | |
def receive = { | |
case OnNext(x) => master ! x | |
case x @ (OnComplete | OnError(_)) => | |
log.info(s"stopped $x") | |
master ! OnComplete | |
context.stop(self) | |
} | |
override val requestStrategy = OneByOneRequestStrategy | |
} | |
class Master extends Actor with ActorLogging { | |
var sources: Seq[ActorRef] = List() | |
def receive = { | |
case OnComplete => | |
log.info(s"complete $sources") | |
sources foreach { _ ! Bye } | |
case SourceRef(ref) => | |
sources = sources :+ ref | |
log.info(s"source $sources") | |
case x @ _ => | |
log.info(s"passed $x to $sources") | |
sources foreach { _ ! x } | |
} | |
} | |
implicit val system = ActorSystem("Sys") | |
import system.dispatcher | |
implicit val materializer = ActorFlowMaterializer() | |
val master: ActorRef = system.actorOf(Props[Master]) | |
val f = Flow.wrap(Sink.actorSubscriber(Props(new Subscribed(master))), | |
Source.actorPublisher(Props(new Published(master))))((_, _) => ()) | |
f.runWith(Source.apply(Seq("A", "B")), Sink.foreach { println }) | |
._2 | |
.onComplete { | |
case scala.util.Failure(f) => | |
println(f) | |
system.shutdown | |
case x @ _ => | |
println("shutdown") | |
system.shutdown | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment