case class OfferSource(ref: SourceRef[Int])
case object RequestSource
akka {
actor {
provider = "cluster"
debug {
lifecycle = on
unhandled = on
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
loglevel = "DEBUG"
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
}
}
object Producer extends App {
implicit val system = ActorSystem("test", ConfigFactory.load())
class Producer extends Actor with ActorLogging {
override def receive = {
case RequestSource =>
val sourceRef =
Source(1 to 1000)
.map(x => {
log.info(s"Giving $x...")
x
})
.runWith(StreamRefs.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(1 minute)))
sourceRef.map(OfferSource) pipeTo sender()
}
}
system.actorOf(Props[Producer], "producer")
}
akka {
actor {
provider = "cluster"
debug {
lifecycle = on
unhandled = on
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
loglevel = "DEBUG"
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
}
}
object Consumer extends App {
implicit val system = ActorSystem("test", ConfigFactory.load())
class Consumer extends Actor with ActorLogging {
case object GetSource
override def receive = {
case GetSource =>
log.info("Requesting source...")
val producer = context.actorSelection("akka.tcp://[email protected]:2551/user/producer")
producer ! RequestSource
case OfferSource(sourceRef) =>
log.info("Got source!")
sourceRef
.throttle(1, 300 millis, 0, ThrottleMode.Shaping)
.runForeach(println)
}
override def preStart() = self ! GetSource
}
system.actorOf(Props[Consumer], "consumer")
}