Skip to content

Instantly share code, notes, and snippets.

@Renkai
Created September 12, 2018 06:20
Show Gist options
  • Save Renkai/9104e9b755e361484b6721d6bd74fae0 to your computer and use it in GitHub Desktop.
Save Renkai/9104e9b755e361484b6721d6bd74fae0 to your computer and use it in GitHub Desktop.
reset kafka offset to specific timestamp
import java.time.{Instant, ZoneId, ZonedDateTime}
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
object KafkaResetSample {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
properties.setProperty("group.id", "jbhair")
val topic = "fordeal_events"
def main(args: Array[String]): Unit = {
val consumer = new KafkaConsumer[String, String](properties, new StringDeserializer, new StringDeserializer)
val targetTime = ZonedDateTime.of(2018, 9, 7, 0, 0, 0, 0, ZoneId.systemDefault())
println(s"targetTime = $targetTime")
val epochMillis = targetTime.toInstant.toEpochMilli
consumer.subscribe(Seq(topic).asJava)
consumer.poll(1000) //trigger lazy subscribe
val partitions =
consumer.listTopics().get(topic).asScala
.map(x => (new TopicPartition(x.topic(), x.partition()), epochMillis.asInstanceOf[java.lang.Long])).toMap
val offsets = consumer.offsetsForTimes(partitions.asJava).asScala
offsets.foreach {
case (topicPartition, offsetTimestamp) =>
val topic = topicPartition.topic()
val partition = topicPartition.partition()
val offset = offsetTimestamp.offset()
val timestamp = offsetTimestamp.timestamp()
val dateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault())
println(s"(topic,partition,offset,timestamp,dateTime) = ${(topic, partition, offset,timestamp, dateTime)}")
consumer.seek(topicPartition, offset)
}
consumer.commitSync()
consumer.close()
checkSuccess()
}
def checkSuccess() = {
val consumer = new KafkaConsumer[String, String](properties, new StringDeserializer, new StringDeserializer)
consumer.subscribe(Seq(topic).asJava)
val records = consumer.poll(10000).asScala.take(10).toSeq
println(s"records = ${records.size}")
records.foreach {
record =>
val partition = record.partition()
val offset = record.offset()
val epochMilis = record.timestamp()
val time = Instant.ofEpochMilli(epochMilis).atZone(ZoneId.systemDefault())
println(s"(partition,offset,time) = ${(partition, offset, time)}")
}
consumer.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment