Last active
April 27, 2016 02:10
-
-
Save josdirksen/9e0dbdc59f82030086fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "akka-http-websockets" | |
version := "1.0" | |
scalaVersion := "2.11.6" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC2", | |
"com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-RC2", | |
"com.typesafe.play" %% "play-json" % "2.3.4", | |
"org.akkamon" %% "akka-mon" % "1.0-SNAPSHOT", | |
"org.java-websocket" % "Java-WebSocket" % "1.3.0") | |
resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import akka.actor._ | |
import akka.routing.{Routee, RemoveRoutee, ActorRefRoutee, AddRoutee} | |
import akka.stream.actor.ActorPublisher | |
import org.akkamon.core.ActorStack | |
import org.akkamon.core.instruments.CounterTrait | |
import play.api.libs.json.Json | |
import scala.annotation.tailrec | |
import scala.concurrent.duration._ | |
/** | |
* for now a very simple actor, which keeps a separate buffer | |
* for each subscriber. This could be rewritten to store the | |
* vmstats in an actor somewhere centrally and pull them from there. | |
* | |
* Based on the standed publisher example from the akka docs. | |
*/ | |
class VMStatsPublisher(router: ActorRef, id: Option[String]) extends ActorPublisher[String] with ActorStack { | |
case class QueueUpdated() | |
import akka.stream.actor.ActorPublisherMessage._ | |
import scala.collection.mutable | |
val MaxBufferSize = 50 | |
val queue = mutable.Queue[String]() | |
var queueUpdated = false; | |
// on startup, register with routee | |
override def preStart() { | |
println(s"Adding to router: $self") | |
router ! AddRoutee(ActorRefRoutee(self)) | |
} | |
// cleanly remove this actor from the router. If | |
// we don't do this the router will also stop since | |
// we didn't specify a different supervisorstrategy | |
override def postStop(): Unit = { | |
println(s"Removing from router: $self") | |
router ! RemoveRoutee(ActorRefRoutee(self)) | |
} | |
def receive = { | |
// receive new stats, add them to the queue, and quickly | |
// exit. | |
case stats: String => | |
// remove the oldest one from the queue and add a new one | |
if (queue.size == MaxBufferSize) queue.dequeue() | |
queue += stats | |
if (!queueUpdated) { | |
queueUpdated = true | |
self ! QueueUpdated | |
} | |
// we receive this message if there are new items in the | |
// queue. If we have a demand for messages send the requested | |
// demand. | |
case QueueUpdated => deliver() | |
// the connected subscriber request n messages, we don't need | |
// to explicitely check the amount, we use totalDemand propery for this | |
case Request(amount) => | |
deliver() | |
// subscriber stops, so we stop ourselves. | |
case Cancel => | |
context.stop(self) | |
} | |
/** | |
* Deliver the message to the subscriber. In the case of websockets over TCP, note | |
* that even if we have a slow consumer, we won't notice that immediately. First the | |
* buffers will fill up before we get feedback. | |
*/ | |
@tailrec final def deliver(): Unit = { | |
if (totalDemand == 0) { | |
id match { | |
case Some(i) => println(s"No more demand for $i") | |
case _ => println(s"No more demand for: $this") | |
} | |
} | |
if (queue.size == 0 && totalDemand != 0) { | |
// we can response to queueupdated msgs again, since | |
// we can't do anything until our queue contains stuff again. | |
queueUpdated = false | |
} else if (totalDemand > 0 && queue.size > 0) { | |
// also send a message to the counter | |
exporter.processCounter(s"count.invocation-actorpublisher-${(id.get)}") | |
onNext(queue.dequeue()) | |
deliver() | |
} | |
} | |
} | |
/** | |
* Just a simple router, which collects some VM stats and sends them to the provided | |
* actorRef each interval. | |
*/ | |
class VMActor(router: ActorRef, delay: FiniteDuration, interval: FiniteDuration) extends Actor { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
context.system.scheduler.schedule(delay, interval) { | |
val json = Json.obj( "stats" -> getStats.map(el => el._1 -> el._2)) | |
router ! Json.prettyPrint(json) | |
} | |
override def receive: Actor.Receive = { | |
case _ => // just ignore any messages | |
} | |
def getStats: Map[String, Long] = { | |
val baseStats = Map[String, Long]( | |
"count.procs" -> Runtime.getRuntime.availableProcessors(), | |
"count.mem.free" -> Runtime.getRuntime.freeMemory(), | |
"count.mem.maxMemory" -> Runtime.getRuntime.maxMemory(), | |
"count.mem.totalMemory" -> Runtime.getRuntime.totalMemory() | |
) | |
val roots = File.listRoots() | |
val totalSpaceMap = roots.map(root => s"count.fs.total.${root.getAbsolutePath}" -> root.getTotalSpace) toMap | |
val freeSpaceMap = roots.map(root => s"count.fs.free.${root.getAbsolutePath}" -> root.getFreeSpace) toMap | |
val usuableSpaceMap = roots.map(root => s"count.fs.usuable.${root.getAbsolutePath}" -> root.getUsableSpace) toMap | |
baseStats ++ totalSpaceMap ++ freeSpaceMap ++ usuableSpaceMap | |
} | |
} | |
/** | |
* Simple router where we can add and remove routee. This actor is not | |
* immutable. | |
*/ | |
class RouterActor extends Actor with CounterTrait { | |
var routees = Set[Routee]() | |
// to make stats more clear | |
actorName = "VMActor" | |
def receive = { | |
case ar: AddRoutee => routees = routees + ar.routee | |
case rr: RemoveRoutee => routees = routees - rr.routee | |
case msg => routees.foreach(_.send(msg, sender)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.URI | |
import akka.actor.Actor.Receive | |
import org.akkamon.core.ActorStack | |
import org.akkamon.core.exporters.StatsdExporter | |
import org.java_websocket.client.WebSocketClient | |
import org.java_websocket.drafts.{Draft_17} | |
import org.java_websocket.handshake.ServerHandshake | |
/** | |
* A very simple websocket client, which we'll use to simulate a slow client to show backpressure | |
* in action with websockets. | |
*/ | |
object WSClient extends App { | |
val NumberOfClients = 10; | |
val RandomRange = 100; | |
val Base = 50; | |
// create and connect the client | |
1 to NumberOfClients foreach({ cnt => | |
val client = new Client(cnt, Math.round(Math.random() * RandomRange + Base)) | |
Thread.sleep(10); | |
client.connect(); | |
} | |
) | |
// Implement specific callbacks | |
class Client(id: Int, delay: Long) extends WebSocketClient(new URI(s"ws://localhost:9001/stats?id=$id"), new Draft_17) { | |
var count = 0 | |
val exporter = StatsdExporter | |
override def onMessage(message: String): Unit = { | |
Thread.sleep(delay); | |
exporter.processCounter(s"count.invocation-websocketclient-$id") | |
count+=1 | |
if (count % 100 == 0) println(f"$id%2d:onmessage:$count%5d") | |
} | |
override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed") | |
override def onOpen(handshakedata: ServerHandshake): Unit = println(s"Websocket openend: delay = $delay") | |
override def onError(ex: Exception): Unit = println("Websocket error" + ex); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.{SocketOptions, Inet4Address, InetAddress, Socket} | |
import java.util.concurrent.TimeoutException | |
import akka.actor.{ActorRef, Props, ActorSystem} | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.HttpMethods._ | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.ws._ | |
import akka.io.Inet | |
import akka.io.Inet.SO | |
import akka.routing.BroadcastGroup | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl._ | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
/** | |
* Extractor to detect websocket messages. This checks whether the header | |
* is available, and whether it contains an actual upgrade message. | |
*/ | |
object WSRequest { | |
def unapply(req: HttpRequest) : Option[HttpRequest] = { | |
if (req.header[UpgradeToWebsocket].isDefined) { | |
req.header[UpgradeToWebsocket] match { | |
case Some(upgrade) => Some(req) | |
case None => None | |
} | |
} else None | |
} | |
} | |
/** | |
* Simple websocket server using akka-http and akka-streams. | |
* | |
* Note that about 600 messages get queued up in the send buffer (on mac, 146988 is default socket buffer) | |
*/ | |
object WSServer extends App { | |
// required actorsystem and flow materializer | |
implicit val system = ActorSystem("websockets") | |
implicit val fm = ActorFlowMaterializer() | |
// setup the actors for the stats | |
// router: will keep a list of connected actorpublisher, to inform them about new stats. | |
// vmactor: will start sending messages to the router, which will pass them on to any | |
// connected routee | |
val router: ActorRef = system.actorOf(Props[RouterActor], "router") | |
val vmactor: ActorRef = system.actorOf(Props(classOf[VMActor], router ,2 seconds, 25 milliseconds)) | |
// Bind to an HTTP port and handle incoming messages. | |
// With the custom extractor we're always certain the header contains | |
// the correct upgrade message. | |
// We can pass in a socketoptions to tune the buffer behavior | |
// e.g options = List(Inet.SO.SendBufferSize(100)) | |
val binding = Http().bindAndHandleSync({ | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/simple"), _, _, _)) => handleWith(req, Flows.reverseFlow) | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow) | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow) | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource) | |
case WSRequest(req@HttpRequest(GET, Uri.Path("/stats"), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router, req.getUri().parameter("id"))) | |
case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request") | |
}, interface = "localhost", port = 9001) | |
// binding is a future, we assume it's ready within a second or timeout | |
try { | |
Await.result(binding, 1 second) | |
println("Server online at http://localhost:9001") | |
} catch { | |
case exc: TimeoutException => | |
println("Server took to long to startup, shutting down") | |
system.shutdown() | |
} | |
/** | |
* Simple helper function, that connects a flow to a specific websocket upgrade request | |
*/ | |
def handleWith(req: HttpRequest, flow: Flow[Message, Message, Unit]) = req.header[UpgradeToWebsocket].get.handleMessages(flow) | |
} | |
/** | |
* This object contains the flows the handle the websockets messages. Each flow is attached | |
* to a websocket and gets executed whenever a message comes in from the client. | |
*/ | |
object Flows { | |
/** | |
* The simple flow just reverses the sent message and returns it to the client. There | |
* are two types of messages, streams and normal textmessages. We only process the | |
* normal ones here, and ignore the others. | |
*/ | |
def reverseFlow: Flow[Message, Message, Unit] = { | |
Flow[Message].map { | |
case TextMessage.Strict(txt) => TextMessage.Strict(txt.reverse) | |
case _ => TextMessage.Strict("Not supported message type") | |
} | |
} | |
/** | |
* Simple flow which just returns the original message | |
* back to the client | |
*/ | |
def echoFlow: Flow[Message, Message, Unit] = Flow[Message] | |
/** | |
* Flow which uses a graph to process the incoming message. | |
* | |
* compute | |
* collect ~> broadcast ~> compute ~> zip ~> map | |
* compute | |
* | |
* We broadcast the message to three map functions, we | |
* then zip them all up, and map them to the response | |
* message which we return. | |
* | |
* @return | |
*/ | |
def graphFlow: Flow[Message, Message, Unit] = { | |
Flow() { implicit b => | |
import FlowGraph.Implicits._ | |
val collect = b.add(Flow[Message].collect[String]({ | |
case TextMessage.Strict(txt) => txt | |
})) | |
// setup the components of the flow | |
val compute1 = b.add(Flow[String].map(_ + ":1")) | |
val compute2 = b.add(Flow[String].map(_ + ":2")) | |
val compute3 = b.add(Flow[String].map(_ + ":3")) | |
val broadcast = b.add(Broadcast[String](3)) | |
val zip = b.add(ZipWith[String,String,String,String]((s1, s2, s3) => s1 + s2 + s3)) | |
val mapToMessage = b.add(Flow[String].map[TextMessage](TextMessage.Strict)) | |
// now we build up the flow | |
broadcast ~> compute1 ~> zip.in0 | |
collect ~> broadcast ~> compute2 ~> zip.in1 | |
broadcast ~> compute3 ~> zip.in2 | |
zip.out ~> mapToMessage | |
(collect.inlet, mapToMessage.outlet) | |
} | |
} | |
/** | |
* When the flow is materialized we don't really just have to respond with a single | |
* message. Any message that is produced from the flow gets sent to the client. This | |
* means we can also attach an additional source to the flow and use that to push | |
* messages to the client. | |
* | |
* So this flow looks like this: | |
* | |
* in ~> filter ~> merge | |
* newSource ~> merge ~> map | |
* This flow filters out the incoming messages, and the merge will only see messages | |
* from our new flow. All these messages get sent to the connected websocket. | |
* | |
* | |
* @return | |
*/ | |
def graphFlowWithExtraSource: Flow[Message, Message, Unit] = { | |
Flow() { implicit b => | |
import FlowGraph.Implicits._ | |
// Graph elements we'll use | |
val merge = b.add(Merge[Int](2)) | |
val filter = b.add(Flow[Int].filter(_ => false)) | |
// convert to int so we can connect to merge | |
val mapMsgToInt = b.add(Flow[Message].map[Int] { msg => -1 }) | |
val mapIntToMsg = b.add(Flow[Int].map[Message]( x => TextMessage.Strict(":" + randomPrintableString(200) + ":" + x.toString))) | |
val log = b.add(Flow[Int].map[Int](x => {println(x); x})) | |
// source we want to use to send message to the connected websocket sink | |
val rangeSource = b.add(Source(1 to 2000)) | |
// connect the graph | |
mapMsgToInt ~> filter ~> merge // this part of the merge will never provide msgs | |
rangeSource ~> log ~> merge ~> mapIntToMsg | |
// expose ports | |
(mapMsgToInt.inlet, mapIntToMsg.outlet) | |
} | |
} | |
/** | |
* Creates a flow which uses the provided source as additional input. This complete scenario | |
* works like this: | |
* 1. When the actor is created it registers itself with a router. | |
* 2. the VMActor sends messages at an interval to the router. | |
* 3. The router next sends the message to this source which injects it into the flow | |
*/ | |
def graphFlowWithStats(router: ActorRef, id: Option[String]): Flow[Message, Message, Unit] = { | |
Flow() { implicit b => | |
import FlowGraph.Implicits._ | |
id match { | |
case Some(i) => println(s"Connection received for stats from id: $i") | |
case _ => println(s"Connection received for stats no id") | |
} | |
// create an actor source | |
val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router, id)) | |
// Graph elements we'll use | |
val merge = b.add(Merge[String](2)) | |
val filter = b.add(Flow[String].filter(_ => false)) | |
// convert to int so we can connect to merge | |
val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" }) | |
val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x))) | |
val statsSource = b.add(source) | |
// connect the graph | |
mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs | |
statsSource ~> merge ~> mapStringToMsg | |
// expose ports | |
(mapMsgToString.inlet, mapStringToMsg.outlet) | |
} | |
} | |
def randomPrintableString(length: Int, start:String = ""): String = { | |
if (length == 0) start else randomPrintableString(length -1, start + Random.nextPrintableChar()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment