Last active
December 27, 2015 23:49
-
-
Save ScrapCodes/7409282 to your computer and use it in GitHub Desktop.
Akka 2.2.3 and 2.1.x remoting difference.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Props, ActorRef, Actor, ActorSystem} | |
import com.typesafe.config.ConfigFactory | |
import scala.util.Random | |
import scala.collection.mutable.LinkedList | |
import scala.reflect.ClassTag | |
import scala.collection.mutable | |
import akka.remote.RemoteActorRefProvider | |
case class SubscribeReceiver(receiverActor: ActorRef) | |
case class UnsubscribeReceiver(receiverActor: ActorRef) | |
object Receiver { | |
def main(args: Array[String]) { | |
val a = ActorSystem("a", ConfigFactory.parseString( | |
""" | |
|akka.daemonic = on | |
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" | |
|akka.remote.netty.hostname = "127.0.0.1" | |
|akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" | |
|akka.remote.netty.connection-timeout = 600s | |
|akka.remote.netty.port = 9998 | |
""".stripMargin)) | |
val feeder = a.actorOf(Props(new ReceiverActor("akka://[email protected]:9999/user/FeederActor")), "ReceiverActor") | |
println("Receiver started as:" + feeder) | |
a.awaitTermination() | |
} | |
} | |
class ReceiverActor[T: ClassTag](urlOfPublisher: String) | |
extends Actor { | |
lazy private val remotePublisher = context.actorFor(urlOfPublisher) | |
override def preStart() = remotePublisher ! SubscribeReceiver(context.self) | |
def receive = { | |
case msg ⇒ println(msg) | |
} | |
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) | |
} | |
class FeederActor extends Actor { | |
val rand = new Random() | |
var receivers: mutable.LinkedList[ActorRef] = new mutable.LinkedList[ActorRef]() | |
val strings: Array[String] = Array("words ", "may ", "count ") | |
var i =0 | |
def makeMessage(): String = { | |
i+=1 | |
val x = rand.nextInt(3) | |
strings(x) + strings(2 - x) + i | |
} | |
/* | |
* A thread to generate random messages | |
*/ | |
new Thread() { | |
override def run() { | |
while (true) { | |
Thread.sleep(500) | |
receivers.foreach(_ ! makeMessage) | |
} | |
} | |
}.start() | |
def receive: Receive = { | |
case SubscribeReceiver(receiverActor: ActorRef) => | |
println("received subscribe from %s".format(receiverActor.toString)) | |
receivers = mutable.LinkedList(receiverActor) ++ receivers | |
case UnsubscribeReceiver(receiverActor: ActorRef) => | |
println("received unsubscribe from %s".format(receiverActor.toString)) | |
receivers = receivers.dropWhile(x => x eq receiverActor) | |
} | |
} | |
object Sender { | |
def main(args: Array[String]) { | |
val a = ActorSystem("a", ConfigFactory.parseString( | |
""" | |
|akka.daemonic = on | |
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" | |
|akka.remote.netty.hostname = "127.0.0.1" | |
|akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" | |
|akka.remote.netty.connection-timeout = 600s | |
|akka.remote.netty.port = 9999 | |
""".stripMargin)) | |
val feeder = a.actorOf(Props[FeederActor], "FeederActor") | |
println("Feeder started as:" + feeder) | |
a.awaitTermination() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Props, ActorRef, Actor, ActorSystem} | |
import com.typesafe.config.ConfigFactory | |
import scala.util.Random | |
import scala.collection.mutable.LinkedList | |
import scala.reflect.ClassTag | |
import scala.collection.mutable | |
case class SubscribeReceiver(receiverActor: ActorRef) | |
case class UnsubscribeReceiver(receiverActor: ActorRef) | |
object Receiver { | |
def main(args: Array[String]) { | |
val a = ActorSystem("a", ConfigFactory.parseString( | |
""" | |
|akka.daemonic = on | |
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 6000 s | |
|akka.remote.watch-failure-detector.threshold = 6000 s | |
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" | |
|akka.remote.netty.tcp.connection-timeout = 6000 s | |
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" | |
|akka.remote.netty.tcp.host = "127.0.0.1" | |
|akka.remote.netty.tcp.port = 9998 | |
""".stripMargin)) | |
val feeder = a.actorOf(Props(new ReceiverActor("akka.tcp://[email protected]:9999/user/FeederActor")), "ReceiverActor") | |
println("Receiver started as:" + feeder) | |
a.awaitTermination() | |
} | |
} | |
class ReceiverActor[T: ClassTag](urlOfPublisher: String) | |
extends Actor { | |
lazy private val remotePublisher = context.actorSelection(urlOfPublisher) | |
override def preStart() = remotePublisher ! SubscribeReceiver(context.self) | |
def receive = { | |
case msg ⇒ println(msg) | |
} | |
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) | |
} | |
class FeederActor extends Actor { | |
val rand = new Random() | |
var receivers: mutable.LinkedList[ActorRef] = new mutable.LinkedList[ActorRef]() | |
val strings: Array[String] = Array("words ", "may ", "count ") | |
var i =0 | |
def makeMessage(): String = { | |
i+=1 | |
val x = rand.nextInt(3) | |
strings(x) + strings(2 - x) + i | |
} | |
/* | |
* A thread to generate random messages | |
*/ | |
new Thread() { | |
override def run() { | |
while (true) { | |
Thread.sleep(500) | |
receivers.foreach(_ ! makeMessage) | |
} | |
} | |
}.start() | |
def receive: Receive = { | |
case SubscribeReceiver(receiverActor: ActorRef) => | |
println("received subscribe from %s".format(receiverActor.toString)) | |
receivers = mutable.LinkedList(receiverActor) ++ receivers | |
case UnsubscribeReceiver(receiverActor: ActorRef) => | |
println("received unsubscribe from %s".format(receiverActor.toString)) | |
receivers = receivers.dropWhile(x => x eq receiverActor) | |
} | |
} | |
object Sender { | |
def main(args: Array[String]) { | |
val a = ActorSystem("a", ConfigFactory.parseString( | |
""" | |
|akka.daemonic = on | |
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 6000 s | |
|akka.remote.watch-failure-detector.threshold = 6000 s | |
|akka.remote.netty.tcp.connection-timeout = 6000 s | |
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" | |
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" | |
|akka.remote.netty.tcp.host = "127.0.0.1" | |
|akka.remote.netty.tcp.port = 9999 | |
""".stripMargin)) | |
val feeder = a.actorOf(Props[FeederActor], "FeederActor") | |
println("Feeder started as:" + feeder) | |
a.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment