Created
July 18, 2015 20:58
-
-
Save searler/bbf8ea7b8d9e486c3a9d to your computer and use it in GitHub Desktop.
Simple request/response TCP client using AKKA streams and FSM
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io | |
import scala.concurrent.duration.DurationInt | |
import scala.util.Success | |
import akka.actor.ActorRef | |
import akka.actor.ActorSystem | |
import akka.actor.FSM | |
import akka.actor.PoisonPill | |
import akka.actor.Props | |
import akka.pattern.ask | |
import akka.stream.ActorMaterializer | |
import akka.stream.OverflowStrategy | |
import akka.stream.scaladsl.Sink | |
import akka.stream.scaladsl.Source | |
import akka.stream.scaladsl.Tcp | |
import akka.util.ByteString | |
import akka.util.Timeout | |
/** | |
* TCP client that connects to 127.0.0.1:9999, using a request/response pattern. | |
* | |
*/ | |
object FSMClienter extends App { | |
implicit val system = ActorSystem("Sys") | |
import system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val client = system.actorOf(Props[ClientActor]) | |
implicit val timeout = Timeout(15 seconds) | |
while (true) { | |
val r = (client ? (scala.io.StdIn.readLine() + "\n")) | |
r.onComplete {_ match { | |
case Success(v:ByteString) => println(v.utf8String) | |
case _ @ x => println(x) | |
} } | |
} | |
sealed trait State | |
case object Disconnected extends State | |
case object Connected extends State | |
case object AwaitResult extends State | |
sealed trait Data | |
case object Empty extends Data | |
case class Connection(sink: ActorRef) extends Data | |
case class Transaction(connection: Connection, requestor: ActorRef) extends Data | |
class ClientActor extends FSM[State, Data] { | |
startWith(Disconnected, Empty) | |
when(Disconnected) { | |
case Event(s: String, Empty) => { | |
val c = makeConnection | |
c.sink ! ByteString(s) | |
goto(AwaitResult) using Transaction(c, sender()) | |
} | |
} | |
when(Connected) { | |
case Event(s: String, c: Connection) => { | |
c.sink ! ByteString(s) | |
goto(AwaitResult) using Transaction(c, sender()) | |
} | |
} | |
when(AwaitResult) { | |
case Event(bytes: ByteString, Transaction(connection, requestor)) => | |
requestor ! bytes | |
goto(Connected) using connection | |
case Event(s: String, Transaction(connection, requestor)) => | |
connection.sink ! ByteString(s) | |
stay using Transaction(connection, sender()) | |
} | |
whenUnhandled { | |
case Event(_, c: Connection) => | |
c.sink ! PoisonPill | |
goto(Disconnected) using Empty | |
case Event(_, Transaction(c, _)) => | |
c.sink ! PoisonPill | |
goto(Disconnected) using Empty | |
} | |
private def makeConnection = { | |
case object Done | |
val connection = Tcp().outgoingConnection("127.0.0.1", 9999) | |
val src = Source.actorRef[ByteString](10, OverflowStrategy.fail) | |
val sink = Sink.actorRef(self, Done) | |
Connection(src.via(connection).to(sink).run) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment