Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active April 19, 2016 10:09
Show Gist options
  • Select an option

  • Save ktoso/4dda7752bf6f4393d1ac to your computer and use it in GitHub Desktop.

Select an option

Save ktoso/4dda7752bf6f4393d1ac to your computer and use it in GitHub Desktop.
Akka Streams: opening a new TCP connection if the server closes it
-----
asdf server (does nothing, then times out):
TIMEOUT: Exception `WEBrick::HTTPStatus::EOFError' at /System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/lib/ruby/2.0.0/webrick/httpserver.rb:80 - WEBrick::HTTPStatus::EOFError
-----
[info] Running akka.stream.will.WillApp
RECONNECT: start a new connection!
BECAUSE TCP CLOSED: [INFO] [02/11/2015 01:29:36.962] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP-STREAM/client-1-%2F127.0.0.1%3A9292] Message [akka.io.Tcp$Closed$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#234029134] to Actor[akka://default/system/IO-TCP-STREAM/client-1-%2F127.0.0.1%3A9292#49317068] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/11/2015 01:29:36.962] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] Message [akka.dispatch.sysmsg.DeathWatchNotification] from Actor[akka://default/system/IO-TCP/selectors/$a/0#234029134] to Actor[akka://default/system/IO-TCP/selectors/$a/0#234029134] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.will
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{ FlowGraphImplicits, Sink, Broadcast, UndefinedSource, UndefinedSink, Flow, StreamTcp }
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage }
import akka.util.ByteString
object WillApp extends App {
implicit val sys = ActorSystem()
implicit val mat = ActorFlowMaterializer()
val localhost = new InetSocketAddress("127.0.0.1", 9292)
def connection = StreamTcp().outgoingConnection(localhost)
def handler1: Flow[ByteString, ByteString] = Flow() { implicit b ⇒
import akka.stream.scaladsl.FlowGraphImplicits._
val out = UndefinedSink[ByteString]
val in = UndefinedSource[ByteString]
val bcast = Broadcast[ByteString]
val onCompleteStartNewConnection = Sink.onComplete { _ ⇒
println("start a new connection!")
connection.handleWith(handler1)
}
in ~> bcast ~> onCompleteStartNewConnection
bcast ~> Flow[ByteString].map { i ⇒ println(i.size); i } ~> out
in → out
}
def handler2: Flow[ByteString, ByteString] = Flow() { implicit b ⇒
import FlowGraphImplicits._
val out = UndefinedSink[ByteString]
val in = UndefinedSource[ByteString]
in ~> Flow[ByteString].transform(() ⇒ new PushStage[ByteString, ByteString] {
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
println("elem.size = " + elem.size)
ctx.push(elem)
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
println("start a new connection!")
connection.handleWith(handler2)
ctx.finish()
}
}) ~> out
in → out
}
connection.handleWith(handler2)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment