Skip to content

Instantly share code, notes, and snippets.

@ktoso
Created January 27, 2015 14:27
Show Gist options
  • Save ktoso/d7603819c5aebed81545 to your computer and use it in GitHub Desktop.
Save ktoso/d7603819c5aebed81545 to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.webinar
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Concat, Source, UndefinedSink, UndefinedSource, FlowGraphImplicits, Flow, StreamTcp }
import akka.stream.scaladsl.StreamTcp.OutgoingConnection
import akka.util.ByteString
import scala.annotation.tailrec
object HttpClient extends App {
implicit val sys = ActorSystem("stream-tcp-system")
implicit val mat = FlowMaterializer()
//#setup
val localhost = new InetSocketAddress("127.0.0.1", 8888)
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
val repl = Flow[ByteString]
.transform(() ⇒ RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text ⇒ println("Server: " + text))
.map(_ ⇒ readLine("> "))
.map {
case "q" ⇒
sys.shutdown(); ByteString("BYE")
case text ⇒ ByteString(s"$text")
}
connection.handleWith(repl)
}
object HttpServer extends App {
implicit val sys = ActorSystem("stream-tcp-system")
implicit val mat = FlowMaterializer()
//#setup
val localhost = new InetSocketAddress("127.0.0.1", 8888)
val binding = StreamTcp().bind(localhost)
//#welcome-banner-chat-server
binding.connections foreach { connection ⇒
val serverLogic = Flow() { implicit b ⇒
import FlowGraphImplicits._
// to be filled in by StreamTCP
val in = UndefinedSource[ByteString]
val out = UndefinedSink[ByteString]
val welcomeMsg =
s"""|Welcome to: ${connection.localAddress}!
|You are: ${connection.remoteAddress}!""".stripMargin
val welcome = Source.single(ByteString(welcomeMsg))
val echo = Flow[ByteString]
.transform(() ⇒ RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!")
.map(ByteString(_))
val concat = Concat[ByteString]
// first we emit the welcome message,
welcome ~> concat.first
// then we continue using the echo-logic Flow
in ~> echo ~> concat.second
concat.out ~> out
(in, out)
}
connection.handleWith(serverLogic)
}
}
object RecipeParseLines {
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
//#parse-lines
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment