Last active
December 6, 2016 19:51
-
-
Save ktoso/b0e6c4a34cd036839a991dae21d06c60 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model._ | |
import akka.stream.Materializer | |
import scala.concurrent.{Future, Promise} | |
import scala.concurrent.duration.FiniteDuration | |
import scala.util.{Failure, Success} | |
final case class ResponseTimeoutException(request: HttpRequest, message: String) extends RuntimeException(message) | |
final class FancyClientTimeouts(implicit system: ActorSystem, mat: Materializer) { | |
val http = Http(system) | |
def singleRequestWithTimeout(request: HttpRequest, responseTimeout: FiniteDuration): Future[HttpResponse] = { | |
val response = http.singleRequest(request) | |
val p = Promise[HttpResponse]() | |
// allocating failure here since this way the stacktrace will point to where this was called from: | |
val timeoutException = RequestTimeoutException(request, s"Response did not arrive within $responseTimeout") | |
// timeout exception | |
val timeoutCancellable = system.scheduler.scheduleOnce(responseTimeout, new Runnable { | |
override def run(): Unit = p.tryFailure(timeoutException) | |
}) | |
response.onComplete { | |
case Success(res) => | |
timeoutCancellable.cancel() | |
// we attempt to suceed the Future, not with 2 steps because of atomicity of the swap | |
try p.success(res) catch { | |
case _: Throwable => | |
system.log.debug("Draining entity body for late response [{} {}], which was completed with timeout ({}) failure already.", request.method, request.uri, responseTimeout) | |
res.discardEntityBytes() // we discard the databytes explicitly, to avoid stalling the connection | |
} | |
case Failure(failed) => | |
timeoutCancellable.cancel() | |
p.tryFailure(failed) | |
} | |
p.completeWith(response.map { res => | |
timeoutCancellable.cancel() | |
res | |
}) | |
p.future | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment