-
-
Save codejitsu/c3c2e006f64b104c342b to your computer and use it in GitHub Desktop.
akka http file server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttptest | |
import akka.http.Http | |
import akka.stream.ActorFlowMaterializer | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.http.model._ | |
object TestClient extends App { | |
implicit val system = ActorSystem("ServerTest") | |
implicit val materializer = ActorFlowMaterializer() | |
val host = "127.0.0.1" | |
val httpClient = Http(system).outgoingConnection(host, 80) | |
val printChunksConsumer = Sink.foreach[HttpResponse] { res => | |
if(res.status == StatusCodes.OK) { | |
res.entity.getDataBytes().map { chunk => | |
System.out.write(chunk.toArray) | |
System.out.flush() | |
}.to(Sink.ignore).run() | |
} else | |
println(res.status) | |
} | |
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer) | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkahttptest | |
import java.nio.channels.FileChannel | |
import java.nio.file.{Path, Paths, StandardOpenOption} | |
import java.nio.{ByteBuffer, MappedByteBuffer} | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.http.model.HttpEntity.ChunkStreamPart | |
import akka.http.model._ | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.util.{ByteString, Timeout} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] { | |
require(buffer.isReadOnly) | |
require(chunkSize > 0) | |
override def hasNext = buffer.hasRemaining | |
override def next(): ByteString = { | |
val size = chunkSize min buffer.remaining() | |
val temp = buffer.slice() | |
temp.limit(size) | |
buffer.position(buffer.position() + size) | |
ByteString(temp) | |
} | |
} | |
object Main extends App { | |
def map(path: Path) : MappedByteBuffer = { | |
val channel = FileChannel.open(path, StandardOpenOption.READ) | |
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size()) | |
channel.close() | |
result | |
} | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorFlowMaterializer() | |
implicit val askTimeout: Timeout = 500.millis | |
import HttpMethods._ | |
val requestHandler: HttpRequest ⇒ HttpResponse = { | |
case HttpRequest(GET, uri, headers, _, _) => | |
val path = Paths.get(uri.path.toString()) | |
val result = Try { | |
val mappedByteBuffer = map(path) | |
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096) | |
val chunks = Source(() => iterator).map { x => | |
println("Chunk of size " + x.size) | |
ChunkStreamPart(x) | |
} | |
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)) | |
} recover { | |
case NonFatal(cause) => | |
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage) | |
} | |
result.get | |
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") | |
} | |
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080) | |
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection => | |
// foreach materializes the source | |
println("Accepted new connection from " + connection.remoteAddress) | |
// ... and then actually handle the connection | |
connection.handleWithSyncHandler(requestHandler) | |
}).run() | |
System.in.read() | |
system.shutdown() | |
system.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment