Created
May 11, 2017 07:22
-
-
Save edenhill/9dfc019b980a5eb3365c84524a1f12b0 to your computer and use it in GitHub Desktop.
confluent-kafka-go example to start consuming 5 messages from the end (tail 5)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example function-based high-level Apache Kafka consumer | |
package main | |
/** | |
* Copyright 2016 Confluent Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
// consumer_example implements a consumer using the non-channel Poll() API | |
// to retrieve messages and events. | |
import ( | |
"fmt" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
"os" | |
"os/signal" | |
"syscall" | |
) | |
func main() { | |
if len(os.Args) < 4 { | |
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n", | |
os.Args[0]) | |
os.Exit(1) | |
} | |
broker := os.Args[1] | |
group := os.Args[2] | |
topics := os.Args[3:] | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": broker, | |
"group.id": group, | |
"go.application.rebalance.enable": true, // delegate Assign() responsibility to app | |
"session.timeout.ms": 6000, | |
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}}) | |
if err != nil { | |
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) | |
os.Exit(1) | |
} | |
fmt.Printf("Created Consumer %v\n", c) | |
err = c.SubscribeTopics(topics, nil) | |
run := true | |
for run == true { | |
select { | |
case sig := <-sigchan: | |
fmt.Printf("Caught signal %v: terminating\n", sig) | |
run = false | |
default: | |
ev := c.Poll(100) | |
if ev == nil { | |
continue | |
} | |
switch e := ev.(type) { | |
case kafka.AssignedPartitions: | |
parts := make([]kafka.TopicPartition, | |
len(e.Partitions)) | |
for i, tp := range e.Partitions { | |
tp.Offset = kafka.OffsetTail(5) // Set start offset to 5 messages from end of partition | |
parts[i] = tp | |
} | |
fmt.Printf("Assign %v\n", parts) | |
c.Assign(parts) | |
case *kafka.Message: | |
fmt.Printf("%% Message on %s:\n%s\n", | |
e.TopicPartition, string(e.Value)) | |
case kafka.PartitionEOF: | |
fmt.Printf("%% Reached %v\n", e) | |
case kafka.Error: | |
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) | |
run = false | |
default: | |
fmt.Printf("Ignored %v\n", e) | |
} | |
} | |
} | |
fmt.Printf("Closing consumer\n") | |
c.Close() | |
} |
@apabla Use GetMetadata() to get the partitions for a topic.
@edenhill appreciate the feedback. I just figured it out :-)
Using
adminclient.GetMetadata(&singletopic, false, timeout); e != nil
Whatever is returned I am using that to construct a list of partitions to consumer from.
topicpartionlist := []kafka.TopicPartition{}
Would love to contribute to the community and share some of this work.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello could you please advise on how to obtain access to all partitions associated to a Topic? Is this the only way to locate topic partitions?
case kafka.AssignedPartitions:
parts := make([]kafka.TopicPartition,