Skip to content

Instantly share code, notes, and snippets.

@johanandren
Last active April 17, 2020 12:17
Show Gist options
  • Save johanandren/7683da950273dfc7f3fb to your computer and use it in GitHub Desktop.
Save johanandren/7683da950273dfc7f3fb to your computer and use it in GitHub Desktop.
Sample streaming response with akka http
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.{Marshaller, ToResponseMarshaller}
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, MediaTypes}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object StreamNumbers extends App {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
implicit val toResponseMarshaller: ToResponseMarshaller[Source[Int, Any]] =
Marshaller.opaque { items =>
val data = items.map(item => ChunkStreamPart(item.toString + "\n"))
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`text/plain`, data))
}
def newDataStream(): Stream[Int] = Stream.from(1)
val route =
path("numbers") {
get {
complete {
Source(() => newDataStream().toIterator)
}
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
Console.readLine()
import system.dispatcher // for the future transformations
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ ⇒ system.shutdown()) // and shutdown when done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment