source
.via(logger("Mqtt Output"))
.map(packetWriter)
.via(logger("Row Output"))
.via(conn.flow
.via(logger("Row Input"))
.map(packetReader))
.via(logger("Mqtt Input"))
.runWith(sink)
Last active
January 11, 2016 09:12
-
-
Save butaji/4922f41ecf1ef6f95165 to your computer and use it in GitHub Desktop.
Example of declarative style of handling Tcp connection with state
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import akka.actor.{ ActorRef, ActorSystem, Props } | |
import akka.stream.ActorMaterializer | |
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext } | |
import akka.stream.actor._ | |
import akka.stream.io.Framing | |
import akka.stream.scaladsl.{ Flow, Sink, Source, Tcp } | |
import akka.util.ByteString | |
import scala.concurrent.Future | |
import scala.util.{ Failure, Success } | |
trait Packet | |
object Mqtt { | |
case object Connect extends Packet | |
case object Connack extends Packet | |
} | |
object DeclarativeMqtt { | |
def main(args: Array[String]): Unit = { | |
val system = ActorSystem("ClientAndServer") | |
val (address, port) = ("127.0.0.1", 1883) | |
server(system, address, port) | |
client(system, address, port) | |
} | |
def client(system: ActorSystem, address: String, port: Int) = { | |
implicit val sys = system | |
import system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val testInput = List(ByteString("CONNECT\n"), ByteString("CONNECT\n"), ByteString("CONNECT\n")) | |
val result: Future[ByteString] = Source(testInput) | |
.via(Tcp().outgoingConnection(address, port)) | |
.runFold(ByteString.empty) { (acc, in) => acc ++ in } | |
result.onComplete { | |
case Success(result) => | |
println(s"Result: " + result.utf8String) | |
println("Shutting down client") | |
system.shutdown() | |
case Failure(e) => | |
println("Failure: " + e.getMessage) | |
system.shutdown() | |
} | |
} | |
def server(system: ActorSystem, address: String, port: Int): Unit = { | |
implicit val sys = system | |
import system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val actor: ActorRef = system.actorOf(Props[StateActor]) | |
val handler = Sink.foreach[Tcp.IncomingConnection] { conn => | |
println("Client connected from: " + conn.remoteAddress) | |
def logger[T](prefix: String) = Flow[T].map(msg => { | |
println(s"$prefix > ${msg}") | |
msg | |
}) | |
val publisher = ActorPublisher[Packet](actor) | |
val subscriber = ActorSubscriber[Packet](actor) | |
val source: Source[Packet, Unit] = Source.fromPublisher(publisher).map(x => x) | |
val sink: Sink[Packet, Unit] = Sink.fromSubscriber(subscriber) | |
val packetWriter: (Packet) => ByteString = { | |
case Mqtt.Connack => ByteString("CONNACK") | |
} | |
val packetReader: (ByteString) => Packet = { | |
case x: ByteString if x.utf8String.startsWith("CONNECT") => Mqtt.Connect | |
} | |
source | |
.via(logger("Mqtt Output")) | |
.map(packetWriter) | |
.via(logger("Row Output")) | |
.via(conn.flow | |
.via(logger("Row Input")) | |
.via(Framing.delimiter(ByteString("\n"), 100)) | |
.map(packetReader)) | |
.via(logger("Mqtt Input")) | |
.runWith(sink) | |
} | |
val connections = Tcp().bind(address, port) | |
val binding = connections.to(handler).run() | |
binding.onComplete { | |
case Success(b) => | |
println("Server started, listening on: " + b.localAddress) | |
case Failure(e) => | |
println(s"Server could not bind to $address:$port: ${e.getMessage}") | |
system.shutdown() | |
} | |
} | |
} | |
class StateActor extends ActorSubscriber with ActorPublisher[Packet] { | |
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy | |
var connected = false | |
def receive = { | |
case OnNext(Mqtt.Connect) => { | |
println("State got Connect connected: " + connected) | |
if (!connected) { | |
connected = true | |
onNext(Mqtt.Connack) | |
} else onErrorThenStop(new Throwable("Already connected!")) | |
} | |
case OnComplete => { | |
println("State got OnComplete") | |
onCompleteThenStop() | |
} | |
case OnError(err) => { | |
println("State got OnError " + err) | |
onErrorThenStop(err) | |
} | |
case x => { | |
println("State got " + x) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment