Created
May 18, 2020 18:29
-
-
Save Daenyth/4c5aa3bafe1d85909437eb93b0cea3e1 to your computer and use it in GitHub Desktop.
POC akka-http to http4s layer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttp4s | |
import akka.http.scaladsl.model.{ | |
ContentType, | |
ContentTypes, | |
HttpEntity, | |
HttpHeader, | |
HttpMethod, | |
HttpMethods, | |
HttpProtocol, | |
HttpProtocols, | |
HttpResponse, | |
ParsingException, | |
ResponseEntity, | |
Uri => AkkaUri | |
} | |
import akka.http.scaladsl.server.{ | |
Rejection, | |
RequestContext, | |
RouteResult, | |
Route => AkkaRoute | |
} | |
import akka.stream.Materializer | |
import akka.stream.scaladsl.Source | |
import cats.data.{NonEmptyChain, OptionT, ValidatedNec} | |
import cats.effect.implicits._ | |
import cats.effect.{ConcurrentEffect, ContextShift} | |
import cats.implicits._ | |
import fs2.Stream | |
import org.http4s.headers.`Content-Type` | |
import org.http4s.syntax.all._ | |
import org.http4s.{ | |
Header, | |
Headers, | |
HttpRoutes, | |
HttpVersion, | |
Method, | |
Request, | |
Response, | |
Uri | |
} | |
import streamz.converter._ | |
import Fs2AkkaCompat | |
import scala.collection.immutable | |
import scala.util.control.NoStackTrace | |
object AkkaHttp4s { | |
def apply[F[_]: ConcurrentEffect: ContextShift]( | |
http4sRoutes: HttpRoutes[F] | |
)(implicit mat: Materializer): AkkaRoute = | |
new AkkaHttp4s(http4sRoutes).akkaRoute | |
} | |
class AkkaHttp4s[F[_]: ConcurrentEffect: ContextShift]( | |
http4sRoutes: HttpRoutes[F] | |
)( | |
implicit mat: Materializer | |
) { | |
private type OrErr[A] = ValidatedNec[Throwable, A] | |
def akkaRoute: AkkaRoute = { req => | |
handleRequest(req) // TODO convert http4s exceptions to Rejections? | |
.getOrElse(RouteResult.Rejected(immutable.Seq.empty[Rejection])) // TODO what does akka-http give for 404? | |
.toIO | |
.unsafeToFuture() | |
} | |
private def handleRequest(req: RequestContext): OptionT[F, RouteResult] = | |
OptionT | |
.liftF(convertRequest(req)) | |
.flatMap(http4sRoutes.run) | |
.semiflatMap(convertResponse) | |
private def convertRequest(req: RequestContext): F[Request[F]] = | |
// TODO does request.uri work if the akka route nests path sections? Should use `req.unmatchedPath`? | |
( | |
methodFrom(req.request.method), | |
convertUri(req.request.uri) | |
).mapN { (method, uri) => | |
val entity: Stream[F, Byte] = | |
req.request.entity | |
.getDataBytes() | |
.toStream() | |
.through(Fs2AkkaCompat.fromByteString) | |
Request(method, | |
uri, | |
httpVersionFrom(req.request.httpMessage.protocol), | |
headersFrom(req.request.headers), | |
entity) | |
} | |
.leftMap(AkkaToHttp4sFailed(_, req): Throwable) | |
.liftTo[F] | |
private def methodFrom(method: HttpMethod): OrErr[Method] = method match { | |
case HttpMethods.CONNECT => Method.CONNECT.validNec | |
case HttpMethods.DELETE => Method.DELETE.validNec | |
case HttpMethods.GET => Method.GET.validNec | |
case HttpMethods.HEAD => Method.HEAD.validNec | |
case HttpMethods.OPTIONS => Method.OPTIONS.validNec | |
case HttpMethods.PATCH => Method.PATCH.validNec | |
case HttpMethods.POST => Method.POST.validNec | |
case HttpMethods.PUT => Method.PUT.validNec | |
case HttpMethods.TRACE => Method.TRACE.validNec | |
case other => Method.fromString(other.value.toUpperCase).toValidatedNec | |
} | |
private def convertUri(akkaUri: AkkaUri): OrErr[Uri] = | |
Uri.fromString(akkaUri.toString()).toValidatedNec | |
private def httpVersionFrom(protocol: HttpProtocol): HttpVersion = | |
protocol match { | |
case HttpProtocols.`HTTP/1.0` => HttpVersion.`HTTP/1.0` | |
case HttpProtocols.`HTTP/1.1` => HttpVersion.`HTTP/1.1` | |
case HttpProtocols.`HTTP/2.0` => HttpVersion.`HTTP/2.0` | |
case other => | |
// The type permits this, but's it's `final` with a private constructor, | |
// so in practice this code path is impossible | |
sys.error(s"impossible http version: $other") | |
} | |
private def httpVersionTo(ver: HttpVersion): HttpProtocol = ver match { | |
case HttpVersion.`HTTP/1.0` => HttpProtocols.`HTTP/1.0` | |
case HttpVersion.`HTTP/1.1` => HttpProtocols.`HTTP/1.1` | |
case HttpVersion.`HTTP/2.0` => HttpProtocols.`HTTP/2.0` | |
case other => | |
// The type permits this, but's it's `final` with a private constructor, | |
// so in practice this code path is impossible | |
sys.error(s"impossible http version: $other") | |
} | |
private def headersFrom( | |
headers: immutable.Seq[HttpHeader] | |
): Headers = { | |
def hdr(h: HttpHeader): Header = Header.Raw.apply(h.name.ci, h.value) | |
Headers(headers.map(hdr).toList) | |
} | |
private def headersTo(headers: Headers): OrErr[immutable.Seq[HttpHeader]] = { | |
def hdr(h: Header): OrErr[HttpHeader] = | |
HttpHeader.parse(h.name.value, h.value) match { | |
case HttpHeader.ParsingResult.Ok(result, _) => result.validNec | |
case HttpHeader.ParsingResult.Error(info) => | |
new Exception(info.format(withDetail = true)).invalidNec | |
} | |
headers.toList.traverse(hdr).map(_.toSeq) | |
} | |
private def convertResponse(resp: Response[F]): F[RouteResult] = | |
( | |
httpEntity(resp), | |
headersTo(resp.headers) | |
).mapN { (entity, headers) => | |
val akkaResp = HttpResponse( | |
status = resp.status.code, | |
headers = headers, | |
entity = entity, | |
protocol = httpVersionTo(resp.httpVersion) | |
) | |
RouteResult.Complete(akkaResp): RouteResult | |
} | |
.leftMap(Http4sToAkkaFailed(_, resp)) | |
.liftTo[F] | |
private def httpEntity(resp: Response[F]): OrErr[ResponseEntity] = { | |
val getContentType = | |
resp.contentType | |
.map(contentTypeTo) | |
.getOrElse(ContentTypes.`application/octet-stream`.validNec) | |
getContentType.map { contentType => | |
val body = Source.fromGraph( | |
resp.body.through(Fs2AkkaCompat.byteToByteString).toSource) | |
HttpEntity.CloseDelimited(contentType, body) | |
} | |
} | |
private def contentTypeTo(ct: `Content-Type`): OrErr[ContentType] = | |
ContentType | |
.parse(ct.value) | |
.leftMap { errs => | |
val msg = errs.map(_.format(withDetail = true)) | |
ParsingException( | |
s"Failed to convert http4s Content-Type to akka-http: $msg"): Throwable | |
} | |
.toValidatedNec | |
} | |
case class AkkaToHttp4sFailed( | |
causes: NonEmptyChain[Throwable], | |
requestContext: RequestContext | |
) extends Exception( | |
s"Failed to convert Akka-Http request to Http4s: ${causes.map(_.toString).mkString_("[", ", ", "]")}", | |
causes.head // arbitrary, but better than not attaching a cause | |
) with NoStackTrace | |
case class Http4sToAkkaFailed[F[_]]( | |
causes: NonEmptyChain[Throwable], | |
response: Response[F] | |
) extends Exception( | |
s"Failed to convert Http4s response to Akka-Http: ${causes.map(_.toString).mkString_("[", ", ", "]")}", | |
causes.head | |
) with NoStackTrace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttp4s | |
import akka.http.scaladsl.model.{ | |
ContentTypes, | |
HttpRequest, | |
HttpResponse, | |
ResponseEntity, | |
StatusCodes | |
} | |
import akka.http.scaladsl.server.{Route => AkkaRoute} | |
import akka.http.scaladsl.testkit.ScalatestRouteTest | |
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} | |
import cats.effect.{Async, IO} | |
import cats.effect.concurrent.Semaphore | |
import cats.implicits._ | |
import fs2.Stream | |
import fs2.concurrent.SignallingRef | |
import io.circe.literal._ | |
import org.http4s.HttpRoutes | |
import org.http4s.dsl.Http4sDsl | |
import streamz.converter._ | |
import IOSpec // https://gist.github.com/Daenyth/67575575b5c1acc1d6ea100aae05b3a9 | |
import scala.concurrent.duration._ | |
class AkkaHttp4sSpec extends IOSpec with Http4sDsl[IO] with ScalatestRouteTest { | |
describe("simple requests") { | |
it("transfers simple responses") { | |
val msg = "hello world" | |
val route = AkkaHttp4s(HttpRoutes.of[IO] { | |
case GET -> Root => Ok(msg) | |
}) | |
routeServer(route)(Get()).flatMap { resp => | |
resp.status shouldBe StatusCodes.OK | |
unmarshall[String](resp.entity).shouldResultIn(msg) | |
} | |
} | |
} | |
describe("streaming") { | |
it("returns stream responses") { | |
import org.http4s.circe.CirceEntityEncoder._ | |
val route = AkkaHttp4s(HttpRoutes.of[IO] { | |
case GET -> Root => | |
Ok(Stream.emit(json"""{"key":"value"}""").covary[IO]) | |
}) | |
routeServer(route)(Get()).flatMap { resp => | |
resp.status shouldBe StatusCodes.OK | |
resp.entity.contentType shouldBe ContentTypes.`application/json` | |
unmarshall[String](resp.entity).shouldResultIn("""{"key":"value"}""") | |
} | |
} | |
it("streams lazily") { | |
val test = for { | |
stopResponse <- SignallingRef[IO, Boolean](false) | |
lock <- Semaphore[IO](1) | |
route: AkkaRoute = AkkaHttp4s(HttpRoutes.of[IO] { | |
case GET -> Root => | |
Ok( | |
Stream | |
.eval(lock.acquire.as("locked")) | |
.repeat // Ensure the stream won't terminate on its own - it must be interrupted | |
.interruptWhen(stopResponse)) | |
}) | |
response <- routeServer(route)(Get()) | |
respEntity <- response.entity.dataBytes | |
.toStream() | |
.evalTap { _ => | |
// Once we get some response bytes, we can terminate the stream producing the server-side | |
// response, allowing the request to complete. | |
stopResponse.set(true) | |
} | |
.map(_.utf8String) | |
.compile | |
.lastOrError | |
} yield { | |
// If the response handling tries to fully produce the stream before returning a response, then we never get here | |
respEntity shouldBe "locked" | |
} | |
test.timeoutTo( | |
2.seconds, | |
IO(fail("Response did not complete - stream was not lazily evaluated"))) | |
} | |
} | |
describe("errors") { | |
it("404s for no match") { | |
val route = AkkaHttp4s(HttpRoutes.of[IO](PartialFunction.empty)) | |
routeServer(route)(Get()).map { resp => | |
resp.status shouldBe StatusCodes.NotFound | |
} | |
} | |
it("translates errors in route") { | |
val route = AkkaHttp4s(HttpRoutes.of[IO] { | |
case GET -> Root => | |
IO.raiseError[String](new Exception("boom")).flatMap(Ok(_)) | |
}) | |
val server = routeServer(route) | |
server(Get()).map { response => | |
println(response) | |
response.status shouldBe StatusCodes.InternalServerError | |
} | |
} | |
it("translates errors from EntityEncoder") { | |
val route = AkkaHttp4s(HttpRoutes.of[IO] { | |
case GET -> Root => | |
// NB this differs from the "errors in route" case because the error here is in the effect | |
// producing the entity body rather than the effect producing the response itself | |
Ok(IO.raiseError[String](new Exception("boom"))) | |
}) | |
val server = routeServer(route) | |
server(Get()).flatMap { response => | |
response.status shouldBe StatusCodes.InternalServerError | |
unmarshall[String](response.entity).shouldFail | |
} | |
} | |
} | |
private def routeServer(route: AkkaRoute): HttpRequest => IO[HttpResponse] = { | |
val server = AkkaRoute.asyncHandler(route) | |
req => Async.fromFuture(IO(server(req))) | |
} | |
private def unmarshall[A: Unmarshaller[ResponseEntity, *]]( | |
entity: ResponseEntity | |
): IO[A] = | |
Async.fromFuture(IO(Unmarshal(entity).to[A])) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttp4s | |
import akka.util.ByteString | |
import fs2._ | |
object Fs2AkkaCompat { | |
def byteToByteString[F[_]]: Pipe[F, Byte, ByteString] = | |
_.mapChunks(c => Chunk.singleton(ByteString.fromArray(c.toBytes.toArray))) | |
def fromByteString[F[_]]: Pipe[F, ByteString, Byte] = | |
_.flatMap(bs => Stream.chunk(Chunk.bytes(bs.toArray))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note
translates errors from EntityEncoder
failsThe effect producing the entity body isn't evaluated until after the http response header is sent, so this route gives a 200 response.
http://gitter.im/http4s/http4s?at=5e664988145f4d69562d65a7
note to self: use a middleware that prefetches N bytes and translates error to 500
http://stackoverflow.com/questions/50174849/akka-streams-best-practice-to-initialise-and-dispose-resources-of-a-sink
take the K[Resource[F, *], Request[F], Response[F]] and .allocated, then use those with watchTermination