Skip to content

Instantly share code, notes, and snippets.

@kiro
Created February 11, 2016 15:17
Show Gist options
  • Save kiro/3fb5a80a2196d694cc61 to your computer and use it in GitHub Desktop.
Save kiro/3fb5a80a2196d694cc61 to your computer and use it in GitHub Desktop.
var partitions = flag.Int("partitions", 8, "number of partitions")
var topic = "test"
var start = time.Now()
func produce() error {
client, err := kafka.Client()
if err != nil {
return err
}
i := 0
for {
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
i++
msg := fmt.Sprint("Message %d", i)
partition := i % *partitions
_, _, err = producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
Partition: int32(partition),
})
if err != nil {
return err
}
if i%10 == 0 {
log.Info(time.Now(), "Produced ", i, " messages.")
}
producer.Close()
if time.Now().Sub(start) > time.Minute {
break
}
}
return nil
}
func consume() (chan struct{}, error) {
saramaCliemt, err := kafka.Client()
if err != nil {
return nil, err
}
consumer, err := sarama.NewConsumerFromClient(saramaCliemt)
if err != nil {
return nil, err
}
result := make(chan struct{})
for i := int32(0); i < int32(*partitions); i++ {
partitionConsumer, err := consumer.ConsumePartition(topic, i, sarama.OffsetNewest)
if err != nil {
return nil, err
}
go func() {
for {
select {
case <-partitionConsumer.Messages():
result <- struct{}{}
case err := <-partitionConsumer.Errors():
log.Error(err)
}
}
}()
}
return result, nil
}
func main() {
flag.Parse()
f, err := os.Create("cpu.pprof")
if err != nil {
fmt.Println("Error: ", err)
}
pprof.StartCPUProfile(f)
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
c, err := consume()
if err != nil {
panic(err)
}
go produce()
i := 0
for range c {
i++
if i%1000 == 0 {
log.Info(i)
}
if time.Now().Sub(start) > time.Minute {
break
}
}
pprof.StopCPUProfile()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment