Skip to content

Instantly share code, notes, and snippets.

@jbellenger
Created May 16, 2015 17:04
Show Gist options
  • Save jbellenger/8444ba094277e357525d to your computer and use it in GitHub Desktop.
Save jbellenger/8444ba094277e357525d to your computer and use it in GitHub Desktop.
Simple akka http client example
import akka.actor.{ActorSystem, Cancellable}
import akka.http.Http
import akka.http.model.{HttpRequest, HttpResponse, Uri}
import akka.http.unmarshalling.Unmarshal
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.duration._
/**
* A simple class for understanding how client http streams work
*/
object StreamsTest {
val uriSource: Source[Uri, Cancellable] =
Source(1.second, 1.second, Uri("http://www.google.com"))
// build a sink that consumes uris, requests them, and prints out part of their contents
def uriSink(implicit sys: ActorSystem, fm: ActorFlowMaterializer): Sink[Uri, Unit] = {
val http = Http(sys)
val requestFlow = Flow[Uri].mapAsync {uri =>
// setup an inner stream for each request/response
val req = HttpRequest(uri = uri)
val src = Source.single(req)
val connFlow = http.outgoingConnection(uri.authority.host.address(), uri.effectivePort)
val sink = Sink.head[HttpResponse]()
val result = src.via(connFlow).toMat(sink)(Keep.right).run()
result
}
// Unmarshalling collects all stream chunks into one byte string
// Without unmarshalling, a single http requests may emit multiple smaller ByteStrings
// ".to[ByteString]" requires an ActorFlowMaterializer, not just any FlowMaterializer
// see PredefinedFromEntityUnmarshallers.byteStringUnmarshaller in akka.http.unmarshalling
val respToByteString = Flow[HttpResponse].mapAsync(Unmarshal(_).to[ByteString])
// just print out the length of each response.
requestFlow.via(respToByteString).map(_.length).to(Sink.foreach(println))
}
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("streaming-http-client")
implicit val mat = ActorFlowMaterializer()
val cancel = uriSource.to(uriSink).run()
Thread.sleep(10000)
cancel.cancel()
system.shutdown()
system.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment