Last active
October 17, 2018 14:43
-
-
Save eyalroth/9864170d7e2aae06e643192998a60f2c to your computer and use it in GitHub Desktop.
akka-http #2257 - Http Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.http.scaladsl.coding.Gzip | |
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, ResponseEntity, StatusCode} | |
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} | |
import akka.http.scaladsl.{ConnectionContext, Http} | |
import akka.stream._ | |
import akka.stream.scaladsl.{Keep, Sink, Source} | |
import com.typesafe.scalalogging.StrictLogging | |
import com.typesafe.sslconfig.akka.AkkaSSLConfig | |
import javax.net.ssl._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Future, Promise} | |
import scala.util.control.NonFatal | |
import scala.util.{Failure, Success} | |
class HttpClient(val host: String, val port: Int, queueSize: Int, secure: Boolean)( | |
implicit val materializer: ActorMaterializer) | |
extends StrictLogging { | |
/* --- Data Members --- */ | |
implicit val ec = materializer.executionContext | |
private val cachedConnectionFlow = { | |
if (secure) { | |
val sslConfig = AkkaSSLConfig.get(materializer.system).mapSettings { settings => | |
settings.withHostnameVerifierClass(classOf[AcceptAllHostnameVerifier].asInstanceOf[Class[HostnameVerifier]]) | |
} | |
val sslContext = SSLContext.getInstance("TLS") | |
sslContext.init(Array[KeyManager](), Array(AcceptAllX509TrustManager), null) | |
val httpsContext = ConnectionContext.https(sslContext, Some(sslConfig)) | |
Http()(materializer.system).cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, httpsContext) | |
} else { | |
Http()(materializer.system).cachedHostConnectionPool[Promise[HttpResponse]](host, port) | |
} | |
} | |
private val queue = | |
Source | |
.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure) | |
.via(cachedConnectionFlow) | |
.toMat(Sink.foreach({ | |
case ((Success(resp), p)) => p.success(resp) | |
case ((Failure(e), p)) => p.failure(e) | |
}))(Keep.left) | |
.run() | |
/* --- Constructors --- */ | |
def this(host: String, port: Int, queueSize: Int, secure: Boolean, system: ActorSystem, dispatcherName: String) = { | |
this(host, port, queueSize, secure)( | |
ActorMaterializer(ActorMaterializerSettings(system).withDispatcher(s"akka.dispatchers.$dispatcherName"))(system)) | |
} | |
/* --- Methods --- */ | |
/* --- Public Methods --- */ | |
val scheme: String = if (secure) "https" else "http" | |
val baseUri: String = s"$scheme://$host:$port/api/v1" | |
def queueRequest(request: HttpRequest, ignoredStatusCodes: Set[StatusCode] = Set()): Future[HttpResponse] = { | |
val responsePromise = Promise[HttpResponse]() | |
queue | |
.offer(Gzip.encodeMessage(request) -> responsePromise) | |
.map { | |
case QueueOfferResult.Enqueued => responsePromise | |
case QueueOfferResult.Dropped => | |
responsePromise failure new RuntimeException("Queue overflowed. Try again later.") | |
case QueueOfferResult.QueueClosed => | |
responsePromise failure new RuntimeException( | |
"Queue was closed (pool shut down) while running the request. Try again later.") | |
case QueueOfferResult.Failure(ex) => responsePromise failure ex | |
} | |
.flatMap(_.future) | |
.andThen { | |
case Success(response) if response.status.isFailure() && !ignoredStatusCodes.contains(response.status) => | |
for (requestBody <- getBody(request.entity, Some(1000)); responseBody <- getBody(response.entity)) { | |
logger.error( | |
s"HTTP request to the server returned bad status. Code: ${response.status}. URI: ${request.uri}. Request Body: $requestBody. Response Body: $responseBody") | |
} | |
case Failure(e) => | |
for (body <- getBody(request.entity, Some(1000))) { | |
logger.error(s"HTTP request to the server failed. URI: ${request.uri}. Body: $body", e) | |
} | |
} | |
} | |
def queueAndUnmarshal[A](request: HttpRequest, ignoredStatusCodes: Set[StatusCode] = Set())( | |
implicit um: Unmarshaller[HttpResponse, A]): Future[A] = { | |
queueRequest(request, ignoredStatusCodes).flatMap { response => | |
Unmarshal(response).to[A].recoverWith { | |
case NonFatal(e) => | |
for (body <- getBody(request.entity, Some(1000))) { | |
logger.error(s"Failed to unmarshal response. URI: ${request.uri}. Body: $body", e) | |
} | |
Future.failed(e) | |
} | |
} | |
} | |
/* --- Private Methods --- */ | |
private def getBody(entity: ResponseEntity, maxSize: Option[Int] = None): Future[String] = { | |
entity | |
.toStrict(300 millis)(materializer) | |
.map { strict => | |
val body = strict.data.utf8String | |
maxSize | |
.map { size => | |
body.substring(0, Math.min(size, body.size)) match { | |
case sub if body.length > sub.length => sub + " ... (truncated)" | |
case _ => body | |
} | |
} | |
.getOrElse(body) | |
} | |
.recover { | |
case e => | |
logger.error("Failed to get body", e) | |
"NaN (failed to get body)" | |
} | |
} | |
} | |
object HttpClient { | |
/* --- Constructors --- */ | |
def apply(config: TypesafeConfiguration, system: ActorSystem): HttpClient = { | |
val host = config.getString("host") | |
val port = config.getInt("port") | |
val queueSize = config.getInt("requests-queue-size") | |
val secure = config.getOptional[Boolean]("secure").getOrElse(true) | |
val dispatcherName = config.getString("dispatcher") | |
new HttpClient(host, port, queueSize, secure, system, dispatcherName) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment