Last active
August 29, 2015 14:10
-
-
Save abrighton/acd43a6cd9c0b997c456 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import java.io.FileOutputStream | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.http.model.HttpMethods._ | |
import akka.http.model._ | |
import akka.io.IO | |
import akka.pattern.ask | |
import akka.stream.FlowMaterializer | |
import akka.stream.scaladsl.{ForeachSink, Sink, Source} | |
import akka.util.{ByteString, Timeout} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.{Try, Failure, Success} | |
// The test client tries to get /tmp/test.txt by getting http://localhost:8549/test.txt | |
// and stores the result in /tmp/out.txt. | |
object TestClient extends App { | |
implicit val system = ActorSystem("ServerTest") | |
import system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
implicit val askTimeout: Timeout = 5000.millis | |
// Server info | |
val host = "localhost" | |
val port = 8549 | |
// File under /tmp to get | |
val file = "test.txt" | |
// The local file in which to store the received data | |
val localFile = "/tmp/out.txt" | |
val out = new FileOutputStream(localFile) | |
val sink = ForeachSink[ByteString] { bytes => | |
println(s"XXX writing ${bytes.size} bytes to $localFile") | |
out.write(bytes.toArray) | |
} | |
val result = for { | |
connection ← IO(Http).ask(Http.Connect(host, port)).mapTo[Http.OutgoingConnection] | |
response ← sendRequest(HttpRequest(GET, uri = s"/$file"), connection) | |
} yield response | |
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { | |
Source(List(request -> 'NoContext)) | |
.to(Sink(connection.requestSubscriber)) | |
.run() | |
Source(connection.responsePublisher).map(_._1).runWith(Sink.head) | |
} | |
result onComplete { | |
case Success(res) if res.status == StatusCodes.OK ⇒ | |
val materialized = res.entity.getDataBytes().to(sink).run() | |
// ensure the output file is closed (otherwise some bytes may not be written) | |
materialized.get(sink).onComplete { | |
case Success(_) => | |
println("Success: closing the file") | |
Try(out.close()) | |
system.shutdown() | |
case Failure(e) => | |
println(s"Failure: ${e.getMessage}") | |
Try(out.close()) | |
system.shutdown() | |
} | |
case Success(res) ⇒ | |
println(s"Got HTTP response code ${res.status}") | |
system.shutdown() | |
case Failure(error) ⇒ println(s"Error: $error") | |
system.shutdown() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import java.io.File | |
import java.nio.channels.FileChannel | |
import java.nio.file.{Path, Paths, StandardOpenOption} | |
import java.nio.{ByteBuffer, MappedByteBuffer} | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.http.model.HttpEntity.ChunkStreamPart | |
import akka.http.model._ | |
import akka.io.IO | |
import akka.pattern.ask | |
import akka.stream.FlowMaterializer | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.util.{ByteString, Timeout} | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
// A test HTTP server that returns files with a given name from /tmp. | |
// The test client tries to get /tmp/test.txt by getting http://localhost:8549/test.txt | |
// and stores the result in /tmp/out.txt. | |
object TestServer extends App { | |
implicit val system = ActorSystem() | |
import system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
implicit val askTimeout: Timeout = 5000.millis | |
val host = "localhost" | |
val port = 8549 | |
// Serve files from this directory | |
val dir = new File("/tmp") | |
val chunkSize = 4096 | |
import akka.http.model.HttpMethods._ | |
val requestHandler: HttpRequest ⇒ HttpResponse = { | |
case HttpRequest(GET, uri, headers, _, _) ⇒ | |
val path = makePath(dir, new File(uri.path.toString())) | |
println(s"Received request for $path (uri = $uri)") | |
val result = Try { | |
val mappedByteBuffer = mmap(path) | |
val iterator = new ByteBufferIterator(mappedByteBuffer, chunkSize) | |
val chunks = Source(iterator).map(ChunkStreamPart.apply) | |
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)) | |
} recover { | |
case NonFatal(cause) ⇒ | |
println(s"Nonfatal error: cause = ${cause.getMessage}") | |
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage) | |
} | |
result.get | |
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") | |
} | |
val bindingFuture = IO(Http) ? Http.Bind(host, port) | |
bindingFuture foreach { | |
case Http.ServerBinding(localAddress, connectionStream) ⇒ | |
println(s"Listening on http:/$localAddress/") | |
Source(connectionStream).foreach { | |
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ | |
println(s"Accepted new connection from $remoteAddress") | |
val materialized = Source(requestProducer).map(requestHandler).to(Sink(responseConsumer)).run() | |
} | |
} | |
bindingFuture.recover { | |
case ex => | |
println("error: " + ex.getMessage) | |
system.shutdown() | |
} | |
def mmap(path: Path): MappedByteBuffer = { | |
val channel = FileChannel.open(path, StandardOpenOption.READ) | |
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size()) | |
channel.close() | |
result | |
} | |
class ByteBufferIterator(buffer: ByteBuffer, chunkSize: Int) extends Iterator[ByteString] { | |
require(buffer.isReadOnly) | |
require(chunkSize > 0) | |
override def hasNext = buffer.hasRemaining | |
override def next(): ByteString = { | |
val size = chunkSize min buffer.remaining() | |
val temp = buffer.slice() | |
temp.limit(size) | |
buffer.position(buffer.position() + size) | |
ByteString(temp) | |
} | |
} | |
def makePath(dir: File, file: File): Path = { | |
Paths.get(dir.getPath, file.getName) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: This is based on:
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "0.11"
val akkaHttp = "com.typesafe.akka" %% "akka-http-experimental" % "0.11"