Skip to content

Instantly share code, notes, and snippets.

@AlexeyRaga
Last active August 29, 2015 14:24
Show Gist options
  • Select an option

  • Save AlexeyRaga/f3868f8a98a96facee83 to your computer and use it in GitHub Desktop.

Select an option

Save AlexeyRaga/f3868f8a98a96facee83 to your computer and use it in GitHub Desktop.
Ackable Batching Streaming
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
object Main extends App {
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorFlowMaterializer()
val srcProps = BatchingConsumingProducer.props(FakeKafkaConsumer.props, 5)
val src = Source.actorPublisher[Ackable[Array[Byte]]](srcProps)
src.runForeach { msg =>
println("got it!")
println(msg)
msg.ack
}
}
import akka.actor.{ActorRef, FSM, Props}
import akka.stream.actor.ActorPublisher
import BatchingConsumingProducer._
import akka.stream.actor.ActorPublisherMessage.Request
import scala.concurrent.duration._
class BatchingConsumingProducer(consumerProps: Props, batchSize: Long)
extends FSM[State, Data]
with ActorPublisher[Ackable[Array[Byte]]] {
val consumer = context.actorOf(consumerProps)
startWith(Batching, emptyBatch)
when(Batching, 1000.millis) {
case Event(Consumed(msg), Batch(msgs)) =>
val batch = msg :: msgs
if (batch.size >= batchSize) {
val startProgress = DeliveryProgress(0, batch)
val currentProgress = deliverDemand(startProgress)
goto(Delivering) using currentProgress
} else {
consumer ! Consume
stay using Batch(batch)
}
case Event(StateTimeout, Batch(Nil)) =>
consumer ! Consume
stay using emptyBatch
case Event(StateTimeout, Batch(msgs)) =>
goto(Delivering) using DeliveryProgress(0, msgs)
}
when(Delivering) {
case Event(Ack, DeliveryProgress(1, Nil)) =>
goto(Batching) using emptyBatch
case Event(Ack, DeliveryProgress(n, msgs)) =>
stay using DeliveryProgress(n-1, msgs)
case Event(Request(num), progress: DeliveryProgress) =>
val currentProgress = deliverDemand(progress)
stay using currentProgress
}
onTransition {
case Batching -> Delivering =>
if (totalDemand > 0) self ! Request(totalDemand)
case Delivering -> Batching =>
consumer ! Ack
consumer ! Consume
case _ -> Batching => consumer ! Consume
}
private def deliverDemand(progress: DeliveryProgress) = {
if (totalDemand == 0) progress
else {
val (send, wait) = progress.send.splitAt(totalDemand.toInt)
send.map(x => new AckableMessage(x, self)).foreach(onNext(_))
DeliveryProgress(progress.awaitAcks + send.size, wait)
}
}
initialize()
consumer ! Consume
}
object BatchingConsumingProducer {
def props(consumerProps: Props, batchSize: Long) = Props(classOf[BatchingConsumingProducer], consumerProps, batchSize)
sealed trait State
case object Batching extends State
case object Delivering extends State
sealed trait Data
case class Batch(messages: List[Array[Byte]]) extends Data
case class DeliveryProgress(awaitAcks: Int, send: List[Array[Byte]]) extends Data
def emptyBatch = Batch(List.empty)
private class AckableMessage[A](val value: A, ackTo: ActorRef) extends Ackable[A] {
def ack: Unit = ackTo ! Ack
def map[B](f: (A) => B): Ackable[B] = new AckableMessage[B](f(value), ackTo)
}
}
case object Ack
trait Ackable[A] {
def value: A
def ack: Unit
def map[B](f: A => B): Ackable[B]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment