Skip to content

Instantly share code, notes, and snippets.

@gzoller
Created September 20, 2016 18:15
Show Gist options
  • Save gzoller/8847556ab1347b886d80cf53c3a1e867 to your computer and use it in GitHub Desktop.
Save gzoller/8847556ab1347b886d80cf53c3a1e867 to your computer and use it in GitHub Desktop.
case class SpeedGraphRC(system: ActorSystem, host: String, groupId: String, topic: String) extends GraphHolder[String] {
val count = new LinkedBlockingQueue[Int]()
val flow = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[akka.NotUsed] =>
import GraphDSL.Implicits._
type In = CommittableMessage[Array[Byte], String]
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(host)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
// Simulated "load" (would be a flow of other tasks in a real system)
val work = Flow[In].map { i =>
i
}.batch(10000, (In) => List(In))((batch, one) => batch :+ one)
val commit = Flow[List[In]].map { msg =>
msg.map { m =>
m.committableOffset.commitScaladsl()
count.add(1)
}
}
src ~> work ~> commit ~> Sink.ignore
ClosedShape
})
}
// ======= Run Test:
val num = 1000000
producer.populate(num, topic)
Thread.sleep(2000)
partitionInfo(topic)
println("Consuming...")
Aggregator.reset()
val sg = SpeedGraphRC(as, kafkaHost, "speed", topic)
sg.flow.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment