Created
April 23, 2014 02:07
-
-
Save ivantopo/11200612 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.util.Timeout | |
import ChunkingActorCassandraIterator.{Click, Interval} | |
import akka.actor._ | |
import akka.io.Tcp | |
import java.util.Date | |
import kamon.trace.TraceRecorder | |
import org.json4s.DefaultFormats | |
import scala.concurrent.ExecutionContext | |
import scala.reflect.ClassTag | |
import scala.util.Try | |
import spray.http.{ContentTypes, HttpEntity, MessageChunk, ChunkedMessageEnd} | |
import spray.httpx.Json4sSupport | |
import spray.httpx.marshalling.{MarshallingContext, Marshaller} | |
import spray.routing.SimpleRoutingApp | |
import akka.pattern.ask | |
import scala.concurrent.duration._ | |
object IteratorExample extends App with SimpleRoutingApp with Json4sSupport { | |
implicit val system = ActorSystem("iterator-example") | |
implicit val ec: ExecutionContext = system.dispatcher | |
implicit val timeout = Timeout(10 seconds) | |
implicit val json4sFormats = DefaultFormats | |
implicit def long2DateConvertible(value: Long) = new { | |
def toDate: Date = new Date(value) | |
} | |
implicit def cassandraIterableMarshaller[T](implicit marshaller: Marshaller[T], refFactory: ActorRefFactory) = | |
Marshaller[CassandraPaginateIterator[T]] { | |
(value, ctx) ⇒ | |
// This happens as a callback of a future created inside the `complete` directive in the route, so it captures | |
// the same `TraceContext` available at that moment. When the value (the `CassandraPaginateIterator`) is sent | |
// to the `ChunkingActorCassandraIterator` that same `TraceContext` is available and attached to the message. | |
refFactory.actorOf(Props.apply(new ChunkingActorCassandraIterator(marshaller, ctx))) ! value | |
} | |
startServer(interface = "0.0.0.0", port = 9090) { | |
path("clicks") { | |
get { | |
parameters('start.as[Long], 'end.as[Long]) { (start, end) => | |
val tryInterval = Try { | |
(start.toDate, end.toDate) | |
} | |
validate(tryInterval.isSuccess, "Url params must be Date") { | |
val (intervalStart, intervalEnd) = tryInterval.get | |
//time("clicks") { // Probably you won't need this, Kamon will give you the metrics! :) | |
complete { | |
(businessActor ? Interval(intervalStart, intervalEnd)).mapTo[CassandraPaginateIterator[Click]] | |
} | |
//} | |
} | |
} | |
} | |
} | |
} | |
val businessActor = system.actorOf(Props(new Actor { | |
def receive: Actor.Receive = { | |
case Interval(_, _) => | |
sender ! new CassandraPaginateIterator[Click](List( | |
List(Click("kamon.io", 100), Click("akka.io", 100)), | |
List(Click("spray.io", 100), Click("google.com", 100)) | |
).iterator) | |
} | |
})) | |
} | |
class CassandraPaginateIterator[T : ClassTag](data: Iterator[Iterable[T]]) extends Iterator[Iterable[T]] { | |
override def next(): Iterable[T] = data.next() | |
override def hasNext: Boolean = data.hasNext | |
} | |
object ChunkingActorCassandraIterator { | |
case class ACK(iterator: CassandraPaginateIterator[_]) | |
case object ClosedBracket | |
//added by ivantopo | |
case class Interval(start: Date, end: Date) | |
case class Click(page: String, count: Int) | |
} | |
//Not handling any error, which erros are possible? | |
class ChunkingActorCassandraIterator[T <: AnyRef](marshaller: Marshaller[T], ctx: MarshallingContext) extends Actor { // with PlatformFormats { //with Logging { | |
import ChunkingActorCassandraIterator._ | |
import org.json4s.native.Serialization._ | |
import context._ | |
implicit val formats = DefaultFormats | |
var connectionActor: ActorRef = _ | |
def receive = firstChunk | |
def firstChunk: Receive = { | |
// When this message is received there is a `TraceContext` attached to it, so it becomes available during the processing | |
// of this message. Then, the call to `ctx.startChunkedMessage` ends up sending a message with your `ACK` internally, | |
// and that message, of course, carries the `TraceContext` with it. When the spray-can pipeline is processing this | |
// message the `TraceContext` is available and when it sends the `ACK` back to you, the context comes back too.. | |
// this is how the context is being propagated back and forth in this example. | |
// Note: Be careful with pattern matching on parametrized types, erasure is playing against you! | |
case cassieIt: CassandraPaginateIterator[T] ⇒ | |
if (!cassieIt.hasNext) { | |
sendEmptyIterator() | |
} else { | |
val page: Iterable[T] = cassieIt.next() | |
val itemsStr = makeChunk("[", page) | |
val headers = List.empty | |
val entity = makeEntity(itemsStr) | |
if (itemsStr == "") { | |
self ! cassieIt | |
} else { | |
become(remainingChunks) | |
connectionActor = ctx.startChunkedMessage(entity, Some(ACK(cassieIt)), headers) | |
} | |
} | |
case _: Tcp.ConnectionClosed ⇒ | |
context.stop(self) | |
case ClosedBracket => | |
connectionActor ! ChunkedMessageEnd.withAck(ChunkedMessageEnd) | |
case ChunkedMessageEnd => context.stop(self) | |
} | |
def remainingChunks: Receive = { | |
case ACK(cassieIt: CassandraPaginateIterator[T]) ⇒ | |
if (!cassieIt.hasNext) { | |
finalMessage() | |
} | |
else { | |
val page: Iterable[T] = cassieIt.next() | |
val chunk = makeChunk(",",page) | |
if (chunk == "") { | |
self ! ACK(cassieIt) | |
} else { | |
val message = MessageChunk(chunk).withAck(ACK(cassieIt)) | |
connectionActor ! message | |
} | |
} | |
case ClosedBracket => connectionActor ! ChunkedMessageEnd.withAck(ChunkedMessageEnd) | |
case ChunkedMessageEnd => context.stop(self) | |
case _: Tcp.ConnectionClosed ⇒ | |
context.stop(self) | |
} | |
private def sendEmptyIterator() = { | |
val entity = HttpEntity(ContentTypes.`application/json`, "[]") | |
connectionActor = ctx.startChunkedMessage(entity, Some(ClosedBracket)) | |
} | |
private def makeChunk(prefix: String, page: Iterable[T]): String = { | |
if (page.isEmpty) { | |
"" | |
} else { | |
val items = page.map(item => write(item)) | |
val result = s"$prefix${items.mkString(",")}" | |
result | |
} | |
} | |
private def finalMessage() = connectionActor ! MessageChunk("]").withAck(ClosedBracket) | |
private def makeEntity(content: String) = HttpEntity(ContentTypes.`application/json`, content) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment