Skip to content

Instantly share code, notes, and snippets.

@ScrapCodes
Last active December 27, 2015 23:49
Show Gist options
  • Save ScrapCodes/7409282 to your computer and use it in GitHub Desktop.
Save ScrapCodes/7409282 to your computer and use it in GitHub Desktop.
Akka 2.2.3 and 2.1.x remoting difference.
import akka.actor.{Props, ActorRef, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.collection.mutable
import akka.remote.RemoteActorRefProvider
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
object Receiver {
def main(args: Array[String]) {
val a = ActorSystem("a", ConfigFactory.parseString(
"""
|akka.daemonic = on
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.hostname = "127.0.0.1"
|akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
|akka.remote.netty.connection-timeout = 600s
|akka.remote.netty.port = 9998
""".stripMargin))
val feeder = a.actorOf(Props(new ReceiverActor("akka://[email protected]:9999/user/FeederActor")), "ReceiverActor")
println("Receiver started as:" + feeder)
a.awaitTermination()
}
}
class ReceiverActor[T: ClassTag](urlOfPublisher: String)
extends Actor {
lazy private val remotePublisher = context.actorFor(urlOfPublisher)
override def preStart() = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
case msg ⇒ println(msg)
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
}
class FeederActor extends Actor {
val rand = new Random()
var receivers: mutable.LinkedList[ActorRef] = new mutable.LinkedList[ActorRef]()
val strings: Array[String] = Array("words ", "may ", "count ")
var i =0
def makeMessage(): String = {
i+=1
val x = rand.nextInt(3)
strings(x) + strings(2 - x) + i
}
/*
* A thread to generate random messages
*/
new Thread() {
override def run() {
while (true) {
Thread.sleep(500)
receivers.foreach(_ ! makeMessage)
}
}
}.start()
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = mutable.LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
object Sender {
def main(args: Array[String]) {
val a = ActorSystem("a", ConfigFactory.parseString(
"""
|akka.daemonic = on
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.hostname = "127.0.0.1"
|akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
|akka.remote.netty.connection-timeout = 600s
|akka.remote.netty.port = 9999
""".stripMargin))
val feeder = a.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
a.awaitTermination()
}
}
import akka.actor.{Props, ActorRef, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.collection.mutable
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
object Receiver {
def main(args: Array[String]) {
val a = ActorSystem("a", ConfigFactory.parseString(
"""
|akka.daemonic = on
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 6000 s
|akka.remote.watch-failure-detector.threshold = 6000 s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.connection-timeout = 6000 s
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.host = "127.0.0.1"
|akka.remote.netty.tcp.port = 9998
""".stripMargin))
val feeder = a.actorOf(Props(new ReceiverActor("akka.tcp://[email protected]:9999/user/FeederActor")), "ReceiverActor")
println("Receiver started as:" + feeder)
a.awaitTermination()
}
}
class ReceiverActor[T: ClassTag](urlOfPublisher: String)
extends Actor {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
override def preStart() = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
case msg ⇒ println(msg)
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
}
class FeederActor extends Actor {
val rand = new Random()
var receivers: mutable.LinkedList[ActorRef] = new mutable.LinkedList[ActorRef]()
val strings: Array[String] = Array("words ", "may ", "count ")
var i =0
def makeMessage(): String = {
i+=1
val x = rand.nextInt(3)
strings(x) + strings(2 - x) + i
}
/*
* A thread to generate random messages
*/
new Thread() {
override def run() {
while (true) {
Thread.sleep(500)
receivers.foreach(_ ! makeMessage)
}
}
}.start()
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = mutable.LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
object Sender {
def main(args: Array[String]) {
val a = ActorSystem("a", ConfigFactory.parseString(
"""
|akka.daemonic = on
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 6000 s
|akka.remote.watch-failure-detector.threshold = 6000 s
|akka.remote.netty.tcp.connection-timeout = 6000 s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.host = "127.0.0.1"
|akka.remote.netty.tcp.port = 9999
""".stripMargin))
val feeder = a.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
a.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment