Skip to content

Instantly share code, notes, and snippets.

@lhns
Last active January 17, 2019 13:45
Show Gist options
  • Select an option

  • Save lhns/624701b2e4fc7d6db2f39a0124f2a359 to your computer and use it in GitHub Desktop.

Select an option

Save lhns/624701b2e4fc7d6db2f39a0124f2a359 to your computer and use it in GitHub Desktop.
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
import AkkaStreamsCompat._
import akka.NotUsed
import akka.stream.TLSProtocol.{NegotiateNewSession, SslTlsInbound, SslTlsOutbound}
import akka.stream._
import akka.stream.scaladsl.{Flow, Keep, Source, TLS, Sink => AkkaSink}
import akka.util.ByteString
import cats.effect.{ConcurrentEffect, ContextShift, IO, Resource}
import fs2._
import fs2.interop.reactivestreams._
import fs2.io.tcp
import javax.net.ssl.SSLContext
import org.reactivestreams.Publisher
import scala.concurrent.Future
import scala.language.higherKinds
/*
Dependencies:
"com.typesafe.akka" %% "akka-stream" % "2.5.18",
"co.fs2" %% "fs2-core" % "1.0.2",
"co.fs2" %% "fs2-reactive-streams" % "1.0.2",
"co.fs2" %% "fs2-io" % "1.0.2"
*/
object Tls {
private val defaultBufferSize = 512
def tcpTransport[F[_] : ConcurrentEffect](socketResource: Resource[F, tcp.Socket[F]]): Pipe[F, Byte, Byte] =
stream => Stream.resource(socketResource)
.flatMap { socket =>
stream.through(socket.writes()).drain merge
socket.reads(1024)
}
def tcpClient[F[_] : ConcurrentEffect : ContextShift](to: InetSocketAddress)
(implicit materializer: Materializer,
AG: AsynchronousChannelGroup): Tls.Socket[F] =
client[F](transport = tcpTransport(tcp.Socket.client[F](to)))
def client[F[_] : ConcurrentEffect](transport: Pipe[F, Byte, Byte],
sslContext: SSLContext = SSLContext.getDefault,
firstSession: NegotiateNewSession = NegotiateNewSession,
bufferSize: Int = defaultBufferSize)
(implicit materializer: Materializer): Tls.Socket[F] =
new Tls.Socket[F](transport, sslContext, firstSession, Client, bufferSize)
def server[F[_] : ConcurrentEffect](transport: Pipe[F, Byte, Byte],
sslContext: SSLContext = SSLContext.getDefault,
firstSession: NegotiateNewSession = NegotiateNewSession,
bufferSize: Int = defaultBufferSize)
(implicit materializer: Materializer): Tls.Socket[F] =
new Tls.Socket[F](transport, sslContext, firstSession, Server, bufferSize)
class Socket[F[_]] private[Tls](transport: Pipe[F, Byte, Byte],
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole,
bufferSize: Int)
(implicit materializer: Materializer,
F: ConcurrentEffect[F]) {
private val byteStreamTransport: Pipe[F, ByteString, ByteString] =
(stream: Stream[F, ByteString]) =>
transport(stream.flatMap(byteString => Stream.chunk(Chunk.bytes(byteString.toArray))))
.mapChunks(chunk => Chunk(ByteString(chunk.toArray)))
private val tls = TLS(sslContext, firstSession, role)
private val tlsWithTransport = tls.join(pipeToFlow(byteStreamTransport))
private val (inputQueue, outputQueue) =
Source.queue[SslTlsOutbound](bufferSize, OverflowStrategy.backpressure)
.viaMat(tlsWithTransport)(Keep.left)
.toMat(AkkaSink.queue())(Keep.both)
.run()
def read: F[Option[SslTlsInbound]] =
F.liftIO(IO.fromFuture(IO(outputQueue.pull())))
def write(outbound: SslTlsOutbound): F[Unit] =
F.liftIO(IO.fromFuture(IO(inputQueue.offer(outbound))).map(_ => ()))
def reads: Stream[F, SslTlsInbound] =
Stream.eval(read).flatMap {
case Some(packet) =>
Stream.emit(packet) ++ reads
case None => Stream.empty
}
def writes: Sink[F, SslTlsOutbound] =
_.flatMap { packet =>
Stream.eval(write(packet))
}
}
}
object AkkaStreamsCompat {
def streamSink[F[_] : ConcurrentEffect, T]: AkkaSink[T, Stream[F, T]] =
AkkaSink.asPublisher(fanout = false)
.mapMaterializedValue((publisher: Publisher[T]) => publisher.toStream[F]())
def streamToSource[F[_] : ConcurrentEffect, T](stream: Stream[F, T]): Source[T, NotUsed] =
Source.lazily(() => Source.fromPublisher(stream.toUnicastPublisher()))
.mapMaterializedValue(_ => NotUsed)
def pipeToFlow[F[_] : ConcurrentEffect, I, O](pipe: Pipe[F, I, O])
(implicit materializer: Materializer): Flow[I, O, NotUsed] =
Flow.lazyInitAsync { () =>
val (stream, sink) = streamSink[F, I].preMaterialize()
val source = streamToSource(pipe(stream))
val flow = Flow.fromSinkAndSource(sink, source)
Future.successful(flow)
}.mapMaterializedValue(_ => NotUsed)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment