Last active
September 20, 2018 15:55
-
-
Save vipmax/cd736a019a02c408a7501059a7943038 to your computer and use it in GitHub Desktop.
kafka concurrent batch commit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val kafkaProps = new Properties() | |
kafkaProps.put("bootstrap.servers", endpoint) | |
kafkaProps.put("key.serializer", classOf[ByteArraySerializer]) | |
kafkaProps.put("key.deserializer", classOf[ByteArrayDeserializer]) | |
kafkaProps.put("value.serializer", classOf[ByteArraySerializer]) | |
kafkaProps.put("value.deserializer", classOf[ByteArrayDeserializer]) | |
kafkaProps.put("group.id", "CrawlerTasksStorage") | |
kafkaProps.put("max.poll.records", "1000") | |
kafkaProps.put("enable.auto.commit","false") | |
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaProps) | |
val kafkaProducer = new KafkaProducer[Array[Byte], Array[Byte]](kafkaProps) | |
val tasksToCommit = collection.mutable.Set[Task]() | |
val tasksAndPartitions = collection.mutable.Map[String, ConsumerRecord[_,_]]() | |
def save(task: Task) { | |
val topic = s"${appname}_${task.taskType}_tasks" | |
val value = itmo.escience.sncrawler.util.Serialization.serialize(task) | |
println(s"Sending task=${task.id} ${task.taskType} to kafka topic=$topic") | |
kafkaProducer.send(new ProducerRecord(topic, value)) | |
kafkaProducer.flush() | |
} | |
def read(taskType:String, limit:Int = 10000) = { | |
println(s"Reading tasks for app=$appname taskType=$taskType") | |
val topicName = s"${appname}_${taskType}_tasks" | |
kafkaConsumer.listTopics().find(_._1 == topicName) match { | |
case Some((t, partitions)) => | |
val topicPartitions = partitions.map(p => new TopicPartition(p.topic(), p.partition())) | |
kafkaConsumer.assign(topicPartitions) | |
val endOffsets = kafkaConsumer.endOffsets(topicPartitions) | |
/** commiting tasks | |
* it should be here because kafkaConsumer not available from different threads */ | |
if (tasksToCommit.nonEmpty) { | |
println(s"commiting tasks=${tasksToCommit.map(_.id)}") | |
tasksToCommit.synchronized { | |
tasksToCommit.map(t => t -> tasksAndPartitions(t.id)) | |
.groupBy(_._2.partition()) | |
.foreach { case (partition, tasksAndRecords) => | |
val topicPartition = new TopicPartition(topicName, partition) | |
val end = endOffsets.get(topicPartition) | |
val committed = Option(kafkaConsumer.committed(topicPartition)).map(_.offset()).getOrElse(0l) | |
val offsets = tasksAndRecords.map(_._2.offset()).toList.sorted | |
val delta = offsets.map(_ - committed).zipWithIndex.takeWhile { case (o, i) => o == i }.size | |
val newOffset = committed + delta | |
kafkaConsumer.commitSync(Map(topicPartition -> new OffsetAndMetadata(newOffset))) | |
tasksAndRecords.toList.sortBy(_._2.offset()).take(delta).foreach { case (task, _) => | |
tasksToCommit -= task | |
tasksAndPartitions -= task.id | |
} | |
} | |
} | |
} | |
/** reading tasks */ | |
val records = kafkaConsumer.poll(1000) | |
val tasksAndRecords = records.map(r => r -> makeTask(r)) | |
tasksAndRecords.foreach { case (record, task) => | |
tasksAndPartitions += task.id -> record | |
} | |
val tasks = tasksAndRecords.map(_._2) | |
println(s"read task ${tasks.map(_.id)} from $t") | |
tasks | |
case _ => List[Task]() | |
} | |
} | |
override def commit(task: Task): Unit = { | |
tasksToCommit.synchronized { | |
tasksToCommit += task | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment