Last active
January 23, 2020 05:14
-
-
Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.
Simple generic Ruby Kafka producer/consumer for testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "kafka" | |
require "logger" | |
require "optparse" | |
script_name = File.basename($0, File.extname($0)) | |
default_logfile = "logs/#{script_name}.log" | |
default_offset = "latest" | |
options = {} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: kafka-consumer.rb [options]" | |
opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| | |
options[:topic] = topic | |
end | |
opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| | |
options[:brokers] = brokers.split(",") | |
end | |
opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| | |
options[:logfile] = logfile | |
end | |
opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| | |
options[:client_id] = client_id | |
end | |
opts.on("-o OFFSET", "--offset", "Use \"earliest\" to fetch all messages in the topic, \"latest\" to only fetch messages produced after this consumer starts. Defaults to \"#{default_offset}\".") do |offset| | |
unless ["earliest", "latest"].include? offset | |
raise ArgumentError, "Offset must be either \"earliest\" or \"latest\"" | |
end | |
options[:offset] = offset | |
end | |
opts.on("-g GOAL", "--goal", "Set an expected number of messages to consume. (Use to time production to Kafka.)") do |goal| | |
options[:goal] = goal | |
end | |
end.parse! | |
logfile = options[:logfile] || default_logfile | |
logger = Logger.new(logfile) | |
brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") | |
client_id = options[:client_id] || script_name | |
topic = options[:topic] | |
offset = (options[:offset] || default_offset).to_sym | |
goal = options[:goal].to_i | |
kafka = Kafka.new( | |
seed_brokers: brokers, | |
client_id: client_id, | |
socket_timeout: 20, | |
logger: logger, | |
) | |
begin | |
partition = 0 | |
consumed = 0 | |
start_time = nil | |
loop do | |
messages = kafka.fetch_messages( | |
topic: topic, | |
partition: partition, | |
offset: offset, | |
) | |
messages.each do |message| | |
start_time ||= Time.now.to_i | |
offset = message.offset + 1 | |
consumed += 1 | |
puts "#{offset}: #{message.value} [Cumulative Runtime: #{Time.now.to_i - start_time} seconds] [#{consumed} messages so far]" | |
end | |
break if (goal > 0 && consumed >= goal) | |
end | |
rescue Interrupt | |
puts | |
ensure | |
puts "Consumed #{consumed} messages." | |
puts "Ran for #{Time.now.to_i - start_time} seconds." if start_time | |
kafka.close | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "kafka" | |
require "logger" | |
require "optparse" | |
require "snappy" | |
script_name = File.basename($0, File.extname($0)) | |
default_logfile = "logs/#{script_name}.log" | |
options = {} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: kafka-producer.rb [options]" | |
opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| | |
options[:topic] = topic | |
end | |
opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| | |
options[:brokers] = brokers.split(",") | |
end | |
opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| | |
options[:logfile] = logfile | |
end | |
opts.on("-c COMPRESSION", "--compression", "Compression codec to use. \"gzip\" or \"snappy\"") do |compression| | |
options[:compression] = compression.to_sym | |
end | |
opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| | |
options[:client_id] = client_id | |
end | |
end.parse! | |
logfile = options[:logfile] || default_logfile | |
logger = Logger.new(logfile) | |
brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") | |
client_id = options[:client_id] || script_name | |
topic = options[:topic] | |
kafka = Kafka.new( | |
seed_brokers: brokers, | |
client_id: client_id, | |
logger: logger, | |
) | |
producer = kafka.producer(compression_codec: options[:compression]) | |
produced = 0 | |
begin | |
$stdin.each do |line| | |
produced += 1 | |
producer.produce(line, topic: topic) | |
producer.deliver_messages | |
end | |
rescue Interrupt | |
puts | |
ensure | |
puts "Produced #{produced} messages." | |
producer.deliver_messages | |
producer.shutdown | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment