-
-
Save jrudolph/43151f78de51c687018d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Tcp { | |
trait ConnectionDescriptor { | |
def remoteAddress: InetSocketAddress | |
def localAddress: InetSocketAddress | |
} | |
sealed trait ConnectionFlow extends Flow[ByteString, ByteString] { | |
def remoteAddress(mMap: MaterializedMap): InetSocketAddress | |
def localAddress(mMap: MaterializedMap): InetSocketAddress | |
def handleWith(handler: Flow[ByteString, ByteString]): RunnableFlow | |
} | |
// passive server-side bind (sugar for the most frequent case) | |
// The user-specified flow is materialized once per accepted connection. | |
// Acceptance can be throttled by applying back-pressure to the produced `Source[ConnectionDescriptor] ` | |
def bind(endpoint: InetSocketAddress, ...) | |
(handler: Flow[ByteString, ByteString]): Source[ConnectionDescriptor] = | |
generalBind(endpoint, ...)(_ handleWith handler) | |
// active or passive server-side, | |
// assumes availability of an akka-stream level vehicle for getting | |
// access to the MaterializationMap from within the stream | |
// (https://github.com/akka/akka/issues/16168) | |
def generalBind(endpoint: InetSocketAddress, ...) | |
(handler: ConnectionFlow => RunnableFlow): Source[ConnectionDescriptor] | |
// active or passive client-side | |
// The ConnectionFlow result is produced synchronously but the actual connection | |
// is not attempted before the flow is materialized on the user side. | |
def connectionFlow(remoteAddress: InetSocketAddress, ...): ConnectionFlow | |
} | |
// example API usage | |
object Echo { | |
val echoFlow = Flow[ByteString].map(identity) | |
// active client-side (the usual mode), | |
// i.e. the client connects and starts the conversation | |
def activeClient(remoteAddress: InetSocketAddress): RunnableFlow = | |
Source(...).connect(Tcp.connectionFlow(addr)).connect(ForeachSink(println)) | |
// passive server-side (the usual mode), | |
// i.e. the client connects and starts the conversation | |
def passiveServer(endpoint: InetSocketAddress): Source[ConnectionDescriptor] = | |
Tcp.bind(endpoint, echoFlow) | |
// passive client-side (the unusual mode), | |
// i.e. the client connects but the server starts the conversation | |
def passiveClient(remoteAddress: InetSocketAddress): RunnableFlow = | |
Tcp.connectionFlow(remoteAddress).handleWith(echoFlow) | |
// active server-side (the unusual mode), | |
// i.e. the client connects but the server starts the conversation | |
def activeServer(endpoint: InetSocketAddress): Source[ConnectionDescriptor] = | |
Tcp.generalBind(endpoint) { connectionFlow => | |
Source(...).connect(connectionFlow).connect(ForeachSink(println)) | |
} | |
} | |
object SSL { | |
def upgradeClientSide(connectionFlow: ConnectionFlow, ...): ConnectionFlow | |
def upgradeServerSide(connectionFlow: ConnectionFlow, ...): ConnectionFlow | |
} | |
object Http { | |
// server-side | |
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse]): Flow[ByteString, ByteString] | |
def bind(endpoint: InetSocketAddress, ...) | |
(handler: Flow[HttpRequest, HttpResponse]): Source[ConnectionDescriptor] = | |
Tcp.bind(endpoint)(serverFlowToTransport(handler)) | |
// client-side | |
// (assumes the availability of a context-providing stream combinator for 1-to-1 streams) | |
trait ClientFlow extends Flow[HttpRequest, HttpResponse] { | |
def remoteAddress(mMap: MaterializedMap): InetSocketAddress | |
def localAddress(mMap: MaterializedMap): InetSocketAddress | |
} | |
def transportToClientFlow(transport: Flow[ByteString, ByteString]): ClientFlow | |
def clientFlow(remoteAddress: InetSocketAddress, ...): ClientFlow = | |
transportToClientFlow(Tcp.connectionFlow(remoteAddress)) | |
} | |
object HttpExample { | |
// simple server | |
Http.bind(...) { | |
Flow[HttpRequest].map { | |
case HttpRequest(...) => HttpResponse(...) | |
} | |
} | |
// simple server with ssl | |
Tcp.bind(...) { | |
SSL.upgradeServerSide { | |
Http.serverFlowToTransport { | |
Flow[HttpRequest].map { | |
case HttpRequest(...) => HttpResponse(...) | |
} | |
} | |
} | |
} | |
// simple client flow (can be reused, but needs to be run) | |
Source.singleton(HttpRequest("ping")) | |
.connect(Http.clientFlow(address)) | |
.connect(ForeachSink(println)): RunnableFlow | |
// simple client flow with SSL (can be reused, but needs to be run) | |
Source.singleton(HttpRequest("ping")) | |
.connect { | |
Http.transportToClientFlow { | |
SSL.upgradeClientSide { | |
Tcp.connectionFlow(address) | |
} | |
} | |
} | |
.connect(ForeachSink(println)): RunnableFlow | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment