Skip to content

Instantly share code, notes, and snippets.

@vipmax
Last active September 20, 2018 15:55
Show Gist options
  • Save vipmax/cd736a019a02c408a7501059a7943038 to your computer and use it in GitHub Desktop.
Save vipmax/cd736a019a02c408a7501059a7943038 to your computer and use it in GitHub Desktop.
kafka concurrent batch commit
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", endpoint)
kafkaProps.put("key.serializer", classOf[ByteArraySerializer])
kafkaProps.put("key.deserializer", classOf[ByteArrayDeserializer])
kafkaProps.put("value.serializer", classOf[ByteArraySerializer])
kafkaProps.put("value.deserializer", classOf[ByteArrayDeserializer])
kafkaProps.put("group.id", "CrawlerTasksStorage")
kafkaProps.put("max.poll.records", "1000")
kafkaProps.put("enable.auto.commit","false")
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaProps)
val kafkaProducer = new KafkaProducer[Array[Byte], Array[Byte]](kafkaProps)
val tasksToCommit = collection.mutable.Set[Task]()
val tasksAndPartitions = collection.mutable.Map[String, ConsumerRecord[_,_]]()
def save(task: Task) {
val topic = s"${appname}_${task.taskType}_tasks"
val value = itmo.escience.sncrawler.util.Serialization.serialize(task)
println(s"Sending task=${task.id} ${task.taskType} to kafka topic=$topic")
kafkaProducer.send(new ProducerRecord(topic, value))
kafkaProducer.flush()
}
def read(taskType:String, limit:Int = 10000) = {
println(s"Reading tasks for app=$appname taskType=$taskType")
val topicName = s"${appname}_${taskType}_tasks"
kafkaConsumer.listTopics().find(_._1 == topicName) match {
case Some((t, partitions)) =>
val topicPartitions = partitions.map(p => new TopicPartition(p.topic(), p.partition()))
kafkaConsumer.assign(topicPartitions)
val endOffsets = kafkaConsumer.endOffsets(topicPartitions)
/** commiting tasks
* it should be here because kafkaConsumer not available from different threads */
if (tasksToCommit.nonEmpty) {
println(s"commiting tasks=${tasksToCommit.map(_.id)}")
tasksToCommit.synchronized {
tasksToCommit.map(t => t -> tasksAndPartitions(t.id))
.groupBy(_._2.partition())
.foreach { case (partition, tasksAndRecords) =>
val topicPartition = new TopicPartition(topicName, partition)
val end = endOffsets.get(topicPartition)
val committed = Option(kafkaConsumer.committed(topicPartition)).map(_.offset()).getOrElse(0l)
val offsets = tasksAndRecords.map(_._2.offset()).toList.sorted
val delta = offsets.map(_ - committed).zipWithIndex.takeWhile { case (o, i) => o == i }.size
val newOffset = committed + delta
kafkaConsumer.commitSync(Map(topicPartition -> new OffsetAndMetadata(newOffset)))
tasksAndRecords.toList.sortBy(_._2.offset()).take(delta).foreach { case (task, _) =>
tasksToCommit -= task
tasksAndPartitions -= task.id
}
}
}
}
/** reading tasks */
val records = kafkaConsumer.poll(1000)
val tasksAndRecords = records.map(r => r -> makeTask(r))
tasksAndRecords.foreach { case (record, task) =>
tasksAndPartitions += task.id -> record
}
val tasks = tasksAndRecords.map(_._2)
println(s"read task ${tasks.map(_.id)} from $t")
tasks
case _ => List[Task]()
}
}
override def commit(task: Task): Unit = {
tasksToCommit.synchronized {
tasksToCommit += task
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment