Last active
October 17, 2024 09:47
-
-
Save werneckpaiva/466e7c6bd1eca98ee4c004f37b544de9 to your computer and use it in GitHub Desktop.
Kafka Consumer - seek by timestamp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Get the list of partitions | |
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName); | |
// Transform PartitionInfo into TopicPartition | |
List<TopicPartition> topicPartitionList = partitionInfos | |
.stream() | |
.map(info -> new TopicPartition(topicName, info.partition())) | |
.collect(Collectors.toList()); | |
// Assign the consumer to these partitions | |
consumer.assign(topicPartitionList); | |
// Look for offsets based on timestamp | |
Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream() | |
.collect(Collectors.toMap(tp -> tp, tp -> timestampMs)); | |
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap); | |
// Force the consumer to seek for those offsets | |
partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset())); |
Author
werneckpaiva
commented
Nov 24, 2022
via email
timestampMs is the timestamp you are looking for. So, this is basically
creating a map with all the TopicPartitions and the timestamp you want. The
consumer.offsetsForTimes doesn´t require that you seek for the same
timestamp in all partitions, so the lambda function is making sure they are
all the same.
…On Thu, 24 Nov 2022 at 04:49, Fr33M0nk ***@***.***> wrote:
***@***.**** commented on this gist.
------------------------------
Hi, Thanks for this snippet.
What does timestampMs lambda expression look like?
—
Reply to this email directly, view it on GitHub
<https://gist.github.com/466e7c6bd1eca98ee4c004f37b544de9#gistcomment-4379973>
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAAH2BXNX4VJNFKTMH4N6FDWJ3XWPBFKMF2HI4TJMJ2XIZLTSKBKK5TBNR2WLJDHNFZXJJDOMFWWLK3UNBZGKYLEL52HS4DFQKSXMYLMOVS2I5DSOVS2I3TBNVS3W5DIOJSWCZC7OBQXE5DJMNUXAYLOORPWCY3UNF3GS5DZVRZXKYTKMVRXIX3UPFYGLK2HNFZXIQ3PNVWWK3TUUZ2G64DJMNZZDAVEOR4XAZNEM5UXG5FFOZQWY5LFVA4TGNZWGU2DEONHORZGSZ3HMVZKMY3SMVQXIZI>
.
You are receiving this email because you authored the thread.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>
.
--
Ricardo Paiva - System Engineer
London - UK
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment