Created
February 11, 2016 15:17
-
-
Save kiro/3fb5a80a2196d694cc61 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var partitions = flag.Int("partitions", 8, "number of partitions") | |
var topic = "test" | |
var start = time.Now() | |
func produce() error { | |
client, err := kafka.Client() | |
if err != nil { | |
return err | |
} | |
i := 0 | |
for { | |
producer, err := sarama.NewSyncProducerFromClient(client) | |
if err != nil { | |
return err | |
} | |
i++ | |
msg := fmt.Sprint("Message %d", i) | |
partition := i % *partitions | |
_, _, err = producer.SendMessage(&sarama.ProducerMessage{ | |
Topic: topic, | |
Value: sarama.StringEncoder(msg), | |
Partition: int32(partition), | |
}) | |
if err != nil { | |
return err | |
} | |
if i%10 == 0 { | |
log.Info(time.Now(), "Produced ", i, " messages.") | |
} | |
producer.Close() | |
if time.Now().Sub(start) > time.Minute { | |
break | |
} | |
} | |
return nil | |
} | |
func consume() (chan struct{}, error) { | |
saramaCliemt, err := kafka.Client() | |
if err != nil { | |
return nil, err | |
} | |
consumer, err := sarama.NewConsumerFromClient(saramaCliemt) | |
if err != nil { | |
return nil, err | |
} | |
result := make(chan struct{}) | |
for i := int32(0); i < int32(*partitions); i++ { | |
partitionConsumer, err := consumer.ConsumePartition(topic, i, sarama.OffsetNewest) | |
if err != nil { | |
return nil, err | |
} | |
go func() { | |
for { | |
select { | |
case <-partitionConsumer.Messages(): | |
result <- struct{}{} | |
case err := <-partitionConsumer.Errors(): | |
log.Error(err) | |
} | |
} | |
}() | |
} | |
return result, nil | |
} | |
func main() { | |
flag.Parse() | |
f, err := os.Create("cpu.pprof") | |
if err != nil { | |
fmt.Println("Error: ", err) | |
} | |
pprof.StartCPUProfile(f) | |
go func() { | |
http.ListenAndServe("localhost:6060", nil) | |
}() | |
c, err := consume() | |
if err != nil { | |
panic(err) | |
} | |
go produce() | |
i := 0 | |
for range c { | |
i++ | |
if i%1000 == 0 { | |
log.Info(i) | |
} | |
if time.Now().Sub(start) > time.Minute { | |
break | |
} | |
} | |
pprof.StopCPUProfile() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment