Last active
August 29, 2015 14:06
-
-
Save huntc/4c25821b0dfb24fe18fc to your computer and use it in GitHub Desktop.
Initial attempt at marshalling a bundle request given multipart formdata
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.typesafe.reactiveruntime.net | |
import akka.actor.{ActorRef, Actor} | |
import akka.http.Unmarshal | |
import akka.http.model.HttpMethods._ | |
import akka.http.model.{ HttpEntity, HttpRequest, HttpResponse, MultipartFormData, StatusCodes } | |
import akka.http.unmarshalling.Unmarshalling | |
import akka.pattern.ask | |
import akka.stream.scaladsl.{ ImplicitFlowMaterializer, Flow } | |
import akka.util.{ Timeout, ByteString } | |
import com.typesafe.reactiveruntime.load.LoadScheduler | |
import com.typesafe.reactiveruntime.load.LoadScheduler.BundleLoaded | |
import com.typesafe.reactiveruntime.start.StartScheduler.ScheduleBundle | |
import com.typesafe.reactiveruntime.{ Digest, _ } | |
import com.typesafe.reactiveruntime.net.Router._ | |
import java.util.UUID | |
import org.reactivestreams.Publisher | |
import scala.collection.immutable.Set | |
import scala.concurrent.Future | |
/** | |
* Handles requests w.r.t. bundle lifecycle commands. | |
*/ | |
trait BundleReqMarshalling extends ImplicitFlowMaterializer { | |
this: Actor => | |
protected val loadScheduler: ActorRef | |
protected val startScheduler: ActorRef | |
private type Requirement = (Double, Long, Long, Set[String]) | |
val bundleReqRoutes: PartialFunction[HttpRequest, Future[HttpResponse]] = { | |
case HttpRequest(POST, p"/bundles", _, entity, _) => loadBundle(entity) | |
case HttpRequest(POST, p"/bundles/$id/started", _, entity, _) => startBundle(id) | |
} | |
private val CpusRequiredField = "cpusRequired" | |
private val MemoryRequiredField = "memoryRequired" | |
private val TotalFileSizeField = "totalFileSize" | |
private val RolesField = "roles" | |
private val RequirementFields = Set( | |
CpusRequiredField, | |
MemoryRequiredField, | |
TotalFileSizeField, | |
RolesField) | |
private val settings = Settings(context.system) | |
private implicit val loaderBundleRetrieveTimeout = Timeout(settings.loaderBundleRetrieveTimeout) | |
private def loadBundle(entity: HttpEntity): Future[HttpResponse] = { | |
import context.dispatcher | |
val marshalled: Future[(Requirement, Publisher[(String, Digest, Publisher[ByteString])])] = | |
Unmarshal(entity).to[MultipartFormData] flatMap { | |
case Unmarshalling.Success(multiPartFormData) => | |
(Flow(multiPartFormData.parts) prefixAndTail RequirementFields.size).toFuture() flatMap { | |
case (prefixBodyParts, tailBodyParts) => | |
val prefixBodyPartsMappings = prefixBodyParts map { prefixBodyPart => | |
val name = prefixBodyPart.name.getOrElse("") | |
val textValue = Unmarshal(prefixBodyPart.entity).to[String] map { | |
case Unmarshalling.Success(value) => | |
value | |
case _ => | |
throw new IllegalArgumentException("There was a problem marshalling the form data") | |
} | |
textValue map (v => name -> v) | |
} | |
val requirements = Future.sequence(prefixBodyPartsMappings) map (_.toMap) map { | |
case r if r.keySet == RequirementFields => ( | |
r.get(CpusRequiredField).get.toDouble, | |
r.get(MemoryRequiredField).get.toLong, | |
r.get(TotalFileSizeField).get.toLong, | |
(r.get(RolesField).get split ' ').toSet) | |
case _ => | |
throw new IllegalArgumentException("There was a problem marshalling the form data - missing field.") | |
} | |
val fileSources = (Flow(tailBodyParts) collect { | |
case part if (part.name == Some("bundle") || part.name == Some("config")) && part.filename.isDefined => | |
val hash = part.filename.get.reverse.dropWhile(_ != '.').takeWhile(_ != '-').reverse | |
(part.name.get, Digest("SHA-256", hash.toByteString), part.entity.dataBytes(flowMaterializer)) | |
}).toPublisher() | |
requirements map { requirement => | |
(requirement, fileSources) | |
} | |
} | |
case e => | |
throw new IllegalArgumentException(s"There was a problem marshalling the multipart form data: $e") | |
} | |
val reply = for { | |
((cpusRequired, memoryRequired, totalFileSize, roles), sources) <- marshalled | |
bundleLoaded <- (loadScheduler ? LoadScheduler.LoadBundle( | |
UUID.randomUUID(), | |
sources, | |
cpusRequired, | |
memoryRequired, | |
totalFileSize, | |
roles)).mapTo[BundleLoaded] | |
} yield HttpResponse(entity = bundleLoaded.bundleId.toString) | |
reply recover { | |
case e: IllegalArgumentException => | |
HttpResponse(StatusCodes.BadRequest, entity = e.getMessage) | |
case e => | |
HttpResponse(StatusCodes.InternalServerError, entity = e.getMessage) | |
} | |
} | |
private def startBundle(id: String): Future[HttpResponse] = { | |
startScheduler ! ScheduleBundle(UUID.randomUUID(), BundleId(id), 1) | |
Future.successful(HttpResponse(entity = "Scheduled")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment