Created
January 27, 2015 14:27
-
-
Save ktoso/d7603819c5aebed81545 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.stream.webinar | |
import java.net.InetSocketAddress | |
import akka.actor.ActorSystem | |
import akka.stream.FlowMaterializer | |
import akka.stream.scaladsl.{ Concat, Source, UndefinedSink, UndefinedSource, FlowGraphImplicits, Flow, StreamTcp } | |
import akka.stream.scaladsl.StreamTcp.OutgoingConnection | |
import akka.util.ByteString | |
import scala.annotation.tailrec | |
object HttpClient extends App { | |
implicit val sys = ActorSystem("stream-tcp-system") | |
implicit val mat = FlowMaterializer() | |
//#setup | |
val localhost = new InetSocketAddress("127.0.0.1", 8888) | |
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost) | |
val repl = Flow[ByteString] | |
.transform(() ⇒ RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) | |
.map(text ⇒ println("Server: " + text)) | |
.map(_ ⇒ readLine("> ")) | |
.map { | |
case "q" ⇒ | |
sys.shutdown(); ByteString("BYE") | |
case text ⇒ ByteString(s"$text") | |
} | |
connection.handleWith(repl) | |
} | |
object HttpServer extends App { | |
implicit val sys = ActorSystem("stream-tcp-system") | |
implicit val mat = FlowMaterializer() | |
//#setup | |
val localhost = new InetSocketAddress("127.0.0.1", 8888) | |
val binding = StreamTcp().bind(localhost) | |
//#welcome-banner-chat-server | |
binding.connections foreach { connection ⇒ | |
val serverLogic = Flow() { implicit b ⇒ | |
import FlowGraphImplicits._ | |
// to be filled in by StreamTCP | |
val in = UndefinedSource[ByteString] | |
val out = UndefinedSink[ByteString] | |
val welcomeMsg = | |
s"""|Welcome to: ${connection.localAddress}! | |
|You are: ${connection.remoteAddress}!""".stripMargin | |
val welcome = Source.single(ByteString(welcomeMsg)) | |
val echo = Flow[ByteString] | |
.transform(() ⇒ RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) | |
.map(_ ++ "!!!") | |
.map(ByteString(_)) | |
val concat = Concat[ByteString] | |
// first we emit the welcome message, | |
welcome ~> concat.first | |
// then we continue using the echo-logic Flow | |
in ~> echo ~> concat.second | |
concat.out ~> out | |
(in, out) | |
} | |
connection.handleWith(serverLogic) | |
} | |
} | |
object RecipeParseLines { | |
import akka.stream.stage._ | |
//#parse-lines | |
def parseLines(separator: String, maximumLineBytes: Int) = | |
new StatefulStage[ByteString, String] { | |
private val separatorBytes = ByteString(separator) | |
private val firstSeparatorByte = separatorBytes.head | |
private var buffer = ByteString.empty | |
private var nextPossibleMatch = 0 | |
def initial = new State { | |
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = { | |
buffer ++= chunk | |
if (buffer.size > maximumLineBytes) | |
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " + | |
s"which is more than $maximumLineBytes without seeing a line terminator")) | |
else emit(doParse(Vector.empty).iterator, ctx) | |
} | |
@tailrec | |
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = { | |
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch) | |
if (possibleMatchPos == -1) { | |
// No matching character, we need to accumulate more bytes into the buffer | |
nextPossibleMatch = buffer.size | |
parsedLinesSoFar | |
} else { | |
if (possibleMatchPos + separatorBytes.size > buffer.size) { | |
// We have found a possible match (we found the first character of the terminator | |
// sequence) but we don't have yet enough bytes. We remember the position to | |
// retry from next time. | |
nextPossibleMatch = possibleMatchPos | |
parsedLinesSoFar | |
} else { | |
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) | |
== separatorBytes) { | |
// Found a match | |
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String | |
buffer = buffer.drop(possibleMatchPos + separatorBytes.size) | |
nextPossibleMatch -= possibleMatchPos + separatorBytes.size | |
doParse(parsedLinesSoFar :+ parsedLine) | |
} else { | |
nextPossibleMatch += 1 | |
doParse(parsedLinesSoFar) | |
} | |
} | |
} | |
} | |
} | |
} | |
//#parse-lines | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment