Skip to content

Instantly share code, notes, and snippets.

@butaji
Last active January 11, 2016 09:12
Show Gist options
  • Save butaji/4922f41ecf1ef6f95165 to your computer and use it in GitHub Desktop.
Save butaji/4922f41ecf1ef6f95165 to your computer and use it in GitHub Desktop.
Example of declarative style of handling Tcp connection with state
  source
    .via(logger("Mqtt Output"))
    .map(packetWriter)
    .via(logger("Row Output"))
    .via(conn.flow
      .via(logger("Row Input"))
      .map(packetReader))
    .via(logger("Mqtt Input"))
    .runWith(sink)
package example
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext }
import akka.stream.actor._
import akka.stream.io.Framing
import akka.stream.scaladsl.{ Flow, Sink, Source, Tcp }
import akka.util.ByteString
import scala.concurrent.Future
import scala.util.{ Failure, Success }
trait Packet
object Mqtt {
case object Connect extends Packet
case object Connack extends Packet
}
object DeclarativeMqtt {
def main(args: Array[String]): Unit = {
val system = ActorSystem("ClientAndServer")
val (address, port) = ("127.0.0.1", 1883)
server(system, address, port)
client(system, address, port)
}
def client(system: ActorSystem, address: String, port: Int) = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val testInput = List(ByteString("CONNECT\n"), ByteString("CONNECT\n"), ByteString("CONNECT\n"))
val result: Future[ByteString] = Source(testInput)
.via(Tcp().outgoingConnection(address, port))
.runFold(ByteString.empty) { (acc, in) => acc ++ in }
result.onComplete {
case Success(result) =>
println(s"Result: " + result.utf8String)
println("Shutting down client")
system.shutdown()
case Failure(e) =>
println("Failure: " + e.getMessage)
system.shutdown()
}
}
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val actor: ActorRef = system.actorOf(Props[StateActor])
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
def logger[T](prefix: String) = Flow[T].map(msg => {
println(s"$prefix > ${msg}")
msg
})
val publisher = ActorPublisher[Packet](actor)
val subscriber = ActorSubscriber[Packet](actor)
val source: Source[Packet, Unit] = Source.fromPublisher(publisher).map(x => x)
val sink: Sink[Packet, Unit] = Sink.fromSubscriber(subscriber)
val packetWriter: (Packet) => ByteString = {
case Mqtt.Connack => ByteString("CONNACK")
}
val packetReader: (ByteString) => Packet = {
case x: ByteString if x.utf8String.startsWith("CONNECT") => Mqtt.Connect
}
source
.via(logger("Mqtt Output"))
.map(packetWriter)
.via(logger("Row Output"))
.via(conn.flow
.via(logger("Row Input"))
.via(Framing.delimiter(ByteString("\n"), 100))
.map(packetReader))
.via(logger("Mqtt Input"))
.runWith(sink)
}
val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.shutdown()
}
}
}
class StateActor extends ActorSubscriber with ActorPublisher[Packet] {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
var connected = false
def receive = {
case OnNext(Mqtt.Connect) => {
println("State got Connect connected: " + connected)
if (!connected) {
connected = true
onNext(Mqtt.Connack)
} else onErrorThenStop(new Throwable("Already connected!"))
}
case OnComplete => {
println("State got OnComplete")
onCompleteThenStop()
}
case OnError(err) => {
println("State got OnError " + err)
onErrorThenStop(err)
}
case x => {
println("State got " + x)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment