- The base gem changed from ruby-kafka to rdkafka
- waterdrop gem too
- configuration must change
- librdkafka-devel package is required
- Responder is deleted
- Consume multiple messages
- Rename attribute
params
tomessages
in BaseConsumer
- Rename attribute
- Remove pidfile support
by infra team
- karafka:
2.0.0.beta3
- karafka-testing:
2.0.0.alpha3
# Delete config/initializers/waterdrop.rb
WaterDrop.setup do |config|
config.client_id = 'myapp'
config.logger = Rails.logger
config.kafka.seed_brokers = Settings.kafka.urls
config.kafka.sasl_scram_username = Settings.kafka.sasl_scram_username
config.kafka.sasl_scram_password = Settings.kafka.sasl_scram_password
config.kafka.sasl_scram_mechanism = Settings.kafka.sasl_scram_mechanism
config.kafka.ssl_ca_certs_from_system = Settings.kafka.ssl_ca_certs_from_system
config.deliver = !Rails.env.test?
end
# Add lib/kafka_producer.rb
class KafkaProducer
include Singleton
def initialize
@producer ||= WaterDrop::Producer.new do |config|
config.deliver = !Rails.env.test?
# https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config.kafka = Settings.kafka.config.to_h
end
end
# For now, sync only
def produce(topic, payload)
producer.produce_sync(topic: topic, payload: payload)
end
# Align interface with WaterDrop v1
def self.call(payload, topic:)
instance.produce(topic, payload)
end
private
attr_reader :producer
end
rm -rf app/responders
# Delete eager load
# Rails.application.eager_load!
# Adjust to rdkafka
config.kafka = Settings.kafka.config.to_h
# Fix subscriber
# Karafka::Instrumentation::StdoutListener is not found on v2
# use Karafka::Instrumentation::LoggerListener. WaterDrop too
Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new)
Karafka.producer.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new(Karafka.logger))
# Delete
# KarafkaApp.boot!
# config/settings.yml
kafka:
config:
bootstrap.servers: localhost:9096
client.id: myapp
topics:
:
# config/settings/xxx.yml
kafka:
config:
bootstrap.servers: ...
# write config to connect broker
- Replace
WaterDrop::SyncProducer.call
toKafkaProducer.call
- Add
require 'kafka_producer'
- Replace
karafka_consumer_for
tokarafka.consumer_for
- Replace
publish_for_karafka
tokarafka.publish
Example implementation: https://github.com/karafka/karafka/blob/2.0/spec/integrations/consumption/single_message_consumer.rb#L9-L28
We manage the foreground process with daemontools. Send TERM signal to refer to pidfile during deployment.
But pidfile support is removed!
It is hard to change runfile when karafka is updated to v2 each product.
Therefore, change the pidfile management in v1.4 first and deploy it, and then modify the runfile in all of consumer apps.
- Write fixed pidfile in mixin module. Do not refer
cli.options
module KarafkaCliServerMixin def call # We manage the foreground process with daemontools. # Send TERM signal to refer to PIDFILE during deployment. pidfile = Rails.root.join('tmp/pids/karafka.pid') FileUtils.mkdir_p(File.dirname(pidfile)) File.write(pidfile, ::Process.pid) super ensure File.delete(pidfile) end end Karafka::Cli::Server.prepend(KarafkaCliServerMixin)
- Remove
-p
option in daemontools runfile - Please upgrade v2 at any time you like
... or change daemon management to systemd