-
-
Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
package akkahttptest | |
import akka.http.Http | |
import akka.stream.ActorFlowMaterializer | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.http.model._ | |
object TestClient extends App { | |
implicit val system = ActorSystem("ServerTest") | |
implicit val materializer = ActorFlowMaterializer() | |
val host = "127.0.0.1" | |
val httpClient = Http(system).outgoingConnection(host, 80) | |
val printChunksConsumer = Sink.foreach[HttpResponse] { res => | |
if(res.status == StatusCodes.OK) { | |
res.entity.getDataBytes().map { chunk => | |
System.out.write(chunk.toArray) | |
System.out.flush() | |
}.to(Sink.ignore).run() | |
} else | |
println(res.status) | |
} | |
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer) | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
package akkahttptest | |
import java.nio.channels.FileChannel | |
import java.nio.file.{Path, Paths, StandardOpenOption} | |
import java.nio.{ByteBuffer, MappedByteBuffer} | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.http.model.HttpEntity.ChunkStreamPart | |
import akka.http.model._ | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.util.{ByteString, Timeout} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] { | |
require(buffer.isReadOnly) | |
require(chunkSize > 0) | |
override def hasNext = buffer.hasRemaining | |
override def next(): ByteString = { | |
val size = chunkSize min buffer.remaining() | |
val temp = buffer.slice() | |
temp.limit(size) | |
buffer.position(buffer.position() + size) | |
ByteString(temp) | |
} | |
} | |
object Main extends App { | |
def map(path: Path) : MappedByteBuffer = { | |
val channel = FileChannel.open(path, StandardOpenOption.READ) | |
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size()) | |
channel.close() | |
result | |
} | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorFlowMaterializer() | |
implicit val askTimeout: Timeout = 500.millis | |
import HttpMethods._ | |
val requestHandler: HttpRequest ⇒ HttpResponse = { | |
case HttpRequest(GET, uri, headers, _, _) => | |
val path = Paths.get(uri.path.toString()) | |
val result = Try { | |
val mappedByteBuffer = map(path) | |
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096) | |
val chunks = Source(() => iterator).map { x => | |
println("Chunk of size " + x.size) | |
ChunkStreamPart(x) | |
} | |
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)) | |
} recover { | |
case NonFatal(cause) => | |
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage) | |
} | |
result.get | |
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") | |
} | |
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080) | |
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => | |
// foreach materializes the source | |
println("Accepted new connection from " + connection.remoteAddress) | |
// ... and then actually handle the connection | |
connection.handleWithSyncHandler(requestHandler) | |
}).run() | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
Which version of akka-http are you using? I am using 0.10-M1
I was using 0.11. Maybe that is why.
I just forked this with a few edits for 0.11 (my first time using Gist, please don't hate on me! :) Not sure what to do with the Sink.future. I also forked a client at https://github.com/topping/akka-http-stream-example/blob/master/akka-http-example/src/main/scala/HttpClient.scala, but it also has errors. Hmm!
It seems like Sink.future is now called Sink.head, since it just uses the first element (head) of the stream. And connect is called to. That should help you fix your compile errors.
Here is a modified version of your test client and server that work with 0.11:
https://gist.github.com/abrighton/acd43a6cd9c0b997c456
The chunking part seems to work. You just need to be careful not to shutdown before it is done.
I updated the example to work with 1.0-M2
Thanks for the example!
In the last line, I had to replace "connect" with "to" to get it to compile.