Skip to content

Instantly share code, notes, and snippets.

@tetafro
Last active November 24, 2020 09:18
Show Gist options
  • Save tetafro/d46ca64e30b1a60b162ffe25ddef89e1 to your computer and use it in GitHub Desktop.
Save tetafro/d46ca64e30b1a60b162ffe25ddef89e1 to your computer and use it in GitHub Desktop.
Overwrite Kafka consumer group offsets
package main
import (
"context"
"fmt"
"time"
kafka "github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/snappy"
)
func setOffset(ctx context.Context, brokers []string, topic, gid string, ts int64) error {
conn, err := kafka.DialContext(ctx, "tcp", brokers[0])
if err != nil {
return fmt.Errorf("create connection: %v", err)
}
defer conn.Close()
// Read partitions list
parts, err := conn.ReadPartitions(topic)
if err != nil {
return fmt.Errorf("get partitions: %v", err)
}
// Get offsets by timestamp
offsets := make(map[int]int64, len(parts))
for _, p := range parts {
addr := fmt.Sprintf("%s:%d", p.Leader.Host, p.Leader.Port)
c, err := kafka.DialLeader(ctx, "tcp", addr, topic, p.ID)
if err != nil {
return fmt.Errorf("create connection to partition %d: %v", p.ID, err)
}
defer c.Close()
offset, err := c.ReadOffset(time.Unix(ts, 0))
if err != nil {
return fmt.Errorf("read offset of partition %d: %v", p.ID, err)
}
offsets[p.ID] = offset
}
// Set offsets for partitions
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
Brokers: brokers,
Topics: []string{topic},
ID: gid,
})
if err != nil {
return fmt.Errorf("create consumer group: %v", err)
}
defer group.Close()
gen, err := group.Next(ctx)
if err != nil {
return fmt.Errorf("get next generation: %v", err)
}
err = gen.CommitOffsets(map[string]map[int]int64{topic: offsets})
if err != nil {
return fmt.Errorf("commit offsets: %v", err)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment