Created
September 12, 2018 06:20
-
-
Save Renkai/9104e9b755e361484b6721d6bd74fae0 to your computer and use it in GitHub Desktop.
reset kafka offset to specific timestamp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.{Instant, ZoneId, ZonedDateTime} | |
import java.util.Properties | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import scala.collection.JavaConverters._ | |
object KafkaResetSample { | |
val properties = new Properties() | |
properties.setProperty("bootstrap.servers", "127.0.0.1:9092") | |
// only required for Kafka 0.8 | |
properties.setProperty("zookeeper.connect", "127.0.0.1:2181") | |
properties.setProperty("group.id", "jbhair") | |
val topic = "fordeal_events" | |
def main(args: Array[String]): Unit = { | |
val consumer = new KafkaConsumer[String, String](properties, new StringDeserializer, new StringDeserializer) | |
val targetTime = ZonedDateTime.of(2018, 9, 7, 0, 0, 0, 0, ZoneId.systemDefault()) | |
println(s"targetTime = $targetTime") | |
val epochMillis = targetTime.toInstant.toEpochMilli | |
consumer.subscribe(Seq(topic).asJava) | |
consumer.poll(1000) //trigger lazy subscribe | |
val partitions = | |
consumer.listTopics().get(topic).asScala | |
.map(x => (new TopicPartition(x.topic(), x.partition()), epochMillis.asInstanceOf[java.lang.Long])).toMap | |
val offsets = consumer.offsetsForTimes(partitions.asJava).asScala | |
offsets.foreach { | |
case (topicPartition, offsetTimestamp) => | |
val topic = topicPartition.topic() | |
val partition = topicPartition.partition() | |
val offset = offsetTimestamp.offset() | |
val timestamp = offsetTimestamp.timestamp() | |
val dateTime = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()) | |
println(s"(topic,partition,offset,timestamp,dateTime) = ${(topic, partition, offset,timestamp, dateTime)}") | |
consumer.seek(topicPartition, offset) | |
} | |
consumer.commitSync() | |
consumer.close() | |
checkSuccess() | |
} | |
def checkSuccess() = { | |
val consumer = new KafkaConsumer[String, String](properties, new StringDeserializer, new StringDeserializer) | |
consumer.subscribe(Seq(topic).asJava) | |
val records = consumer.poll(10000).asScala.take(10).toSeq | |
println(s"records = ${records.size}") | |
records.foreach { | |
record => | |
val partition = record.partition() | |
val offset = record.offset() | |
val epochMilis = record.timestamp() | |
val time = Instant.ofEpochMilli(epochMilis).atZone(ZoneId.systemDefault()) | |
println(s"(partition,offset,time) = ${(partition, offset, time)}") | |
} | |
consumer.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment