Created
March 14, 2016 17:28
-
-
Save sam/7731f883a62b329c6592 to your computer and use it in GitHub Desktop.
Trying to figure out how to clean up actor-per-request actors with akka-http.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Just inlining the revelant bits of https://github.com/hseeberger/akka-http-json/blob/master/akka-http-json4s/src/main/scala/de/heikoseeberger/akkahttpjson4s/Json4sSupport.scala#L44 | |
// for the gist. | |
trait Json4sSupport { | |
implicit def json4sUnmarshallerConverter[A: Manifest](serialization: Serialization, formats: Formats): FromEntityUnmarshaller[A] = | |
json4sUnmarshaller(manifest, serialization, formats) | |
/** | |
* HTTP entity => `A` | |
* | |
* @tparam A type to decode | |
* @return unmarshaller for `A` | |
*/ | |
implicit def json4sUnmarshaller[A: Manifest](implicit serialization: Serialization, formats: Formats): FromEntityUnmarshaller[A] = | |
Unmarshaller | |
.byteStringUnmarshaller | |
.forContentTypes(MediaTypes.`application/json`) | |
.mapWithCharset { (data, charset) => | |
val input = if (charset == HttpCharsets.`UTF-8`) data.utf8String else data.decodeString(charset.nioCharset.name) | |
serialization.read(input) | |
} | |
} | |
// This is basically just the example provided in the Akka documentation AFAICT: | |
// http://doc.akka.io/docs/akka/2.4.2/scala/http/client-side/request-level.html#Using_the_Future-Based_API_in_Actors | |
// | |
// The goal being the same Actor-Instance-per-Request model demonstrated there using preStart | |
// to fire off the request and receive to collect the HttpResponse. | |
// | |
// What the example doesn't show is how to clean up the actor after you're done. Which is | |
// the part I'm having trouble with. The NotFound case appears to work fine, but if I | |
// attempt to PoisonPill the Actor after the JObject Unmarshal case, I get an AbruptTerminationException. | |
class GetDocumentActor(requestor: ActorRef, id: String) extends Actor with Json4sSupport { | |
// Just a little helper to allow `uri / "segment"`. | |
implicit class UriBuilder(uri: Uri) { | |
def /(segment: String): Uri = { | |
uri.withPath(uri.path + "/" + segment) | |
} | |
} | |
import context.dispatcher | |
// Setup. You can pretty much ignore this. It's just to connect to a CouchDB database. | |
val config: Config = context.system.settings.config | |
val credentials: BasicHttpCredentials = BasicHttpCredentials(config.getString("cloudant.username"), | |
config.getString("cloudant.password")) | |
val authorization: Authorization = Authorization(credentials) | |
val databaseUri: Uri = Uri(s"https://${config.getString("cloudant.hostname")}") / config.getString("cloudant.database") | |
implicit val materializer = ActorMaterializer() | |
// JavaTimeSerializers added from json4s PR243 to add Java 8 support. | |
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all | |
// Use json4s Jackson parser support since the json4s native parser is broken in 3.3.x | |
implicit val serialization = jackson.Serialization | |
val http = Http(context.system) | |
// Using preStart to fire off our request. | |
@throws[Exception](classOf[Exception]) | |
override def preStart(): Unit = { | |
// Fire off a singleRequest and pipe the response with forwarding for the requestor, which may not be parent. | |
http.singleRequest(HttpRequest(GET, uri = databaseUri / id, headers = List(authorization))).pipeTo(context.self)(requestor) | |
} | |
def receive: Receive = { | |
// If we get a NotFound, consume the response, forward it to the sender, | |
// then pipe the Done back to ourself. | |
case HttpResponse(NotFound, _, entity, _) => | |
sender ! NotFound | |
entity.dataBytes.runWith(Sink.ignore) pipeTo context.self | |
// If we found the document, deserialize it as a JObject, | |
// and then send ourself a Done message to trigger an eventual stop. | |
case HttpResponse(OK, _, entity, _) => | |
val self = context.self | |
Unmarshal(entity.withContentType(`application/json`)).to[JObject] | |
.pipeTo(sender) | |
// NOTE!!! Comment out the following line and everything "works". | |
// Though obviously you'll leak Actors that way as it'll never get | |
// cleaned up. Attempt to clean it up like this, and you'll get an | |
// AbruptTerminationException however. | |
.onComplete { case _ => self ! Done } | |
// If we get a Done, poison ourself. This appears to work fine for the | |
// NotFound case, but regularly (always?) causes an AbruptTerminationException | |
// for the OK case. | |
case Done => context.self ! PoisonPill | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The scalatest that exposes the failure is just: