Skip to content

Instantly share code, notes, and snippets.

@jturkel
Created May 20, 2016 18:12
Show Gist options
  • Save jturkel/44716874997bab8a08911c06638cdcf5 to your computer and use it in GitHub Desktop.
Save jturkel/44716874997bab8a08911c06638cdcf5 to your computer and use it in GitHub Desktop.
require 'docker'
Docker.url = ENV['DOCKER_HOST'] if ENV['DOCKER_HOST']
# Manages a Kafka + Zookeeper cluster for tests via Docker.
# This will expose the Kafka server on port 9093 to avoid conflicts
# with port 9092.
class KafkaTestCluster
# We should switch this to confluent/kafka when they release a 0.9.0.1 image
KAFKA_IMAGE = 'ches/kafka:0.9.0.1'.freeze
ZOOKEEPER_IMAGE = 'confluent/zookeeper:3.4.6-cp1'.freeze
KAFKA_LOG = 'log/kafka.log'.freeze
ZOOKEEPER_LOG = 'log/zookeeper.log'.freeze
CommandFailedError = Class.new(StandardError)
private_attr_reader :zookeeper_container, :kafka_container
delegate :docker_host, to: 'self.class'
alias_method :kafka_host, :docker_host
def self.start
new
end
def initialize
Rails.logger.info('Starting Kafka cluster')
Docker.validate_version!
download_images
start_zookeeper
start_kafka
Rails.logger.info('Kafka cluster started')
rescue
stop
raise
end
def seed_brokers
["#{kafka_host}:#{kafka_port}"]
end
def kafka_port
return @kafka_port if @kafka_port
config = kafka_container.json.fetch('NetworkSettings').fetch('Ports')
@kafka_port = config.fetch('9092/tcp').first.fetch('HostPort')
end
def create_topic(topic, num_partitions: 1)
Rails.logger.info("Creating topic #{topic}")
run_kafka_command(
'/kafka/bin/kafka-topics.sh',
'--create',
"--topic=#{topic}",
'--replication-factor=1',
"--partitions=#{num_partitions}",
'--zookeeper=zookeeper'
)
end
def delete_topic(topic)
Rails.logger.info("Deleting topic #{topic}")
run_kafka_command(
'/kafka/bin/kafka-topics.sh',
'--delete',
"--topic=#{topic}",
'--zookeeper=zookeeper'
)
rescue CommandFailedError => e
raise unless e.message.include?("Topic #{topic} does not exist")
end
def stop
Rails.logger.info('Stopping Kafka cluster')
safe_delete_container(kafka_container)
safe_delete_container(zookeeper_container)
Rails.logger.info('Kafka cluster stopped')
end
def self.docker_host
@docker_host ||= if ENV.include?('CIRCLECI')
`/sbin/ifconfig docker0 | grep "inet addr" | cut -d ':' -f 2 | cut -d ' ' -f 1`.strip
else
URI(ENV.fetch('DOCKER_HOST')).host
end
end
private
def download_images
[KAFKA_IMAGE, ZOOKEEPER_IMAGE].each do |image|
unless Docker::Image.exist?(image)
Rails.logger.info("Fetching image #{image}")
Docker::Image.create('fromImage' => image)
end
end
end
def start_zookeeper
@zookeeper_container = Docker::Container.create(
'Image' => ZOOKEEPER_IMAGE,
'Hostname' => 'localhost',
'ExposedPorts' => {
'2181/tcp' => {}
}
)
@zookeeper_container.start(
'PortBindings' => {
'2181/tcp' => [
{ 'HostPort' => '' }
]
}
)
capture_container_logs(@zookeeper_container, ZOOKEEPER_LOG)
end
def start_kafka
@kafka_container = Docker::Container.create(
'Image' => KAFKA_IMAGE,
'Hostname' => 'localhost',
'Links' => ["#{zookeeper_container.id}:zookeeper"],
'ExposedPorts' => {
'9092/tcp' => {}
},
'Env' => [
'KAFKA_BROKER_ID=1',
"KAFKA_ADVERTISED_HOST_NAME=#{docker_host}",
'KAFKA_ADVERTISED_PORT=9093',
'KAFKA_DELETE_TOPIC_ENABLE=true'
]
)
@kafka_container.start(
'PortBindings' => {
'9092/tcp' => [
{ 'HostPort' => '9093/tcp' }
]
}
)
capture_container_logs(@kafka_container, KAFKA_LOG)
wait_for_kafka_started
end
def capture_container_logs(container, log_file)
Thread.new do
File.open(log_file, 'a') do |log|
container.attach do |_stream, chunk|
log.print(chunk)
end
end
end
end
def run_kafka_command(*command)
Rails.logger.debug("Running command #{command}")
container = Docker::Container.create(
'Image' => KAFKA_IMAGE,
'Links' => ["#{zookeeper_container.id}:zookeeper"],
'Cmd' => command
)
container.start
status = container.wait.fetch('StatusCode')
output = container.logs(stdout: true, stderr: true)
Rails.logger.debug("Command exited with status=#{status}, output=\n#{output}")
unless status.zero?
raise CommandFailedError, "Kafka command #{command} failed with status #{status}:\n#{output}"
end
output
ensure
safe_delete_container(container)
end
def wait_for_kafka_started
Rails.logger.info('Waiting for Kafka to start')
Wait.for('Kafka started', max_wait_time: 45.seconds) do
begin
socket = TCPSocket.open(kafka_host, kafka_port)
socket.close
true
rescue
false
end
end
Rails.logger.info('Kafka to started')
end
def safe_delete_container(container)
container.delete(force: true) if container
rescue Docker::Error::ServerError
# This fails in Circle. See https://circleci.com/docs/docker-btrfs-error/
raise unless ENV.include?('CIRCLECI')
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment