Skip to content

Instantly share code, notes, and snippets.

@arisawa
Created June 27, 2022 05:26
Show Gist options
  • Save arisawa/1febb7515545439cdca12294387bada4 to your computer and use it in GitHub Desktop.
Save arisawa/1febb7515545439cdca12294387bada4 to your computer and use it in GitHub Desktop.
Upgrade karafka v2

Prerequisites

  • The base gem changed from ruby-kafka to rdkafka
    • waterdrop gem too
    • configuration must change
    • librdkafka-devel package is required
  • Responder is deleted
  • Consume multiple messages
    • Rename attribute params to messages in BaseConsumer
  • Remove pidfile support

Install librdkafka-devel

by infra team

Modify gem version

  • karafka: 2.0.0.beta3
  • karafka-testing: 2.0.0.alpha3

Fix until all tests pass

waterdrop client

# Delete config/initializers/waterdrop.rb
WaterDrop.setup do |config|
  config.client_id = 'myapp'
  config.logger = Rails.logger

  config.kafka.seed_brokers = Settings.kafka.urls
  config.kafka.sasl_scram_username = Settings.kafka.sasl_scram_username
  config.kafka.sasl_scram_password = Settings.kafka.sasl_scram_password
  config.kafka.sasl_scram_mechanism = Settings.kafka.sasl_scram_mechanism
  config.kafka.ssl_ca_certs_from_system = Settings.kafka.ssl_ca_certs_from_system

  config.deliver = !Rails.env.test?
end

# Add lib/kafka_producer.rb
class KafkaProducer
  include Singleton

  def initialize
    @producer ||= WaterDrop::Producer.new do |config|
      config.deliver = !Rails.env.test?

      # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
      config.kafka = Settings.kafka.config.to_h
    end
  end

  # For now, sync only
  def produce(topic, payload)
    producer.produce_sync(topic: topic, payload: payload)
  end

  # Align interface with WaterDrop v1
  def self.call(payload, topic:)
    instance.produce(topic, payload)
  end

  private

  attr_reader :producer
end

Delete Responder

rm -rf app/responders

karafka.rb

# Delete eager load
# Rails.application.eager_load!

# Adjust to rdkafka
config.kafka = Settings.kafka.config.to_h

# Fix subscriber
# Karafka::Instrumentation::StdoutListener is not found on v2
# use Karafka::Instrumentation::LoggerListener. WaterDrop too
Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new)
Karafka.producer.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new(Karafka.logger))

# Delete
# KarafkaApp.boot!

Config

# config/settings.yml
kafka:
  config:
    bootstrap.servers: localhost:9096
    client.id: myapp
  topics:
    :
    
# config/settings/xxx.yml
kafka:
  config:
    bootstrap.servers: ...
    # write config to connect broker    

Use KafkaProducer

  • Replace WaterDrop::SyncProducer.call to KafkaProducer.call
  • Add require 'kafka_producer'

Fix for karafka-testing

  • Replace karafka_consumer_for to karafka.consumer_for
  • Replace publish_for_karafka to karafka.publish

Consumer

Fix #consume method

Example implementation: https://github.com/karafka/karafka/blob/2.0/spec/integrations/consumption/single_message_consumer.rb#L9-L28


Fix pidfile management in karafka.rb

We manage the foreground process with daemontools. Send TERM signal to refer to pidfile during deployment.
But pidfile support is removed!

It is hard to change runfile when karafka is updated to v2 each product.
Therefore, change the pidfile management in v1.4 first and deploy it, and then modify the runfile in all of consumer apps.

  1. Write fixed pidfile in mixin module. Do not refer cli.options
    module KarafkaCliServerMixin
      def call
        # We manage the foreground process with daemontools.
        # Send TERM signal to refer to PIDFILE during deployment.
        pidfile = Rails.root.join('tmp/pids/karafka.pid')
        FileUtils.mkdir_p(File.dirname(pidfile))
        File.write(pidfile, ::Process.pid)
    
        super
      ensure
        File.delete(pidfile)
      end
    end
    
    Karafka::Cli::Server.prepend(KarafkaCliServerMixin)
  2. Remove -p option in daemontools runfile
  3. Please upgrade v2 at any time you like

... or change daemon management to systemd

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment