Created
May 4, 2016 13:11
-
-
Save davidmweber/8c8f0a5ce8af61b6cbfc4f3ab1e9b73b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright © 2016 8eo Inc. | |
*/ | |
package ws | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.client.RequestBuilding._ | |
import akka.http.scaladsl.model.{HttpRequest, StatusCodes} | |
import akka.http.scaladsl.server.Directives._ | |
import akka.http.scaladsl.settings.ConnectionPoolSettings | |
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision} | |
import akka.stream.scaladsl.{Flow, Sink, Source} | |
import scala.concurrent._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success} | |
object TimeoutBug extends App { | |
implicit val system = ActorSystem("Test") | |
implicit val materializer = ActorMaterializer() | |
val host = "localhost" | |
val port = 10101 | |
val route = get { // Just a slow response | |
path("slow") { | |
Thread.sleep(2000) | |
complete(StatusCodes.OK) | |
} | |
} | |
val decider: Supervision.Decider = { | |
case error ⇒ | |
println(s"Supervisor received an error: $error") // Never gets here either | |
Supervision.Stop | |
} | |
val reqFlow = Flow[(HttpRequest, Int)] | |
.via(Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system))) | |
.completionTimeout(1 seconds) // TimeoutExceptions are not caught properly | |
.withAttributes(ActorAttributes.supervisionStrategy(decider)) | |
val server = Http().bindAndHandle(route, host, port) | |
Await.result(server, 1 second) | |
val req = Source.single(Get("/slow") → 0) | |
.via(reqFlow) | |
.runWith(Sink.head) | |
.flatMap { | |
case (Success(s), _) ⇒ Future.successful(s) | |
case (Failure(f), _) ⇒ Future.failed(f) | |
} | |
req.onComplete { | |
case Failure(error) ⇒ println(s"Failure was $error") // Never catches the TimeoutException | |
case Success(r) ⇒ println(s"Success: ${r.status}") | |
} | |
Await.result(req, 5 seconds) | |
server.map(_.unbind().onComplete(_ => system.shutdown())) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment