Created
May 20, 2016 18:12
-
-
Save jturkel/44716874997bab8a08911c06638cdcf5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'docker' | |
Docker.url = ENV['DOCKER_HOST'] if ENV['DOCKER_HOST'] | |
# Manages a Kafka + Zookeeper cluster for tests via Docker. | |
# This will expose the Kafka server on port 9093 to avoid conflicts | |
# with port 9092. | |
class KafkaTestCluster | |
# We should switch this to confluent/kafka when they release a 0.9.0.1 image | |
KAFKA_IMAGE = 'ches/kafka:0.9.0.1'.freeze | |
ZOOKEEPER_IMAGE = 'confluent/zookeeper:3.4.6-cp1'.freeze | |
KAFKA_LOG = 'log/kafka.log'.freeze | |
ZOOKEEPER_LOG = 'log/zookeeper.log'.freeze | |
CommandFailedError = Class.new(StandardError) | |
private_attr_reader :zookeeper_container, :kafka_container | |
delegate :docker_host, to: 'self.class' | |
alias_method :kafka_host, :docker_host | |
def self.start | |
new | |
end | |
def initialize | |
Rails.logger.info('Starting Kafka cluster') | |
Docker.validate_version! | |
download_images | |
start_zookeeper | |
start_kafka | |
Rails.logger.info('Kafka cluster started') | |
rescue | |
stop | |
raise | |
end | |
def seed_brokers | |
["#{kafka_host}:#{kafka_port}"] | |
end | |
def kafka_port | |
return @kafka_port if @kafka_port | |
config = kafka_container.json.fetch('NetworkSettings').fetch('Ports') | |
@kafka_port = config.fetch('9092/tcp').first.fetch('HostPort') | |
end | |
def create_topic(topic, num_partitions: 1) | |
Rails.logger.info("Creating topic #{topic}") | |
run_kafka_command( | |
'/kafka/bin/kafka-topics.sh', | |
'--create', | |
"--topic=#{topic}", | |
'--replication-factor=1', | |
"--partitions=#{num_partitions}", | |
'--zookeeper=zookeeper' | |
) | |
end | |
def delete_topic(topic) | |
Rails.logger.info("Deleting topic #{topic}") | |
run_kafka_command( | |
'/kafka/bin/kafka-topics.sh', | |
'--delete', | |
"--topic=#{topic}", | |
'--zookeeper=zookeeper' | |
) | |
rescue CommandFailedError => e | |
raise unless e.message.include?("Topic #{topic} does not exist") | |
end | |
def stop | |
Rails.logger.info('Stopping Kafka cluster') | |
safe_delete_container(kafka_container) | |
safe_delete_container(zookeeper_container) | |
Rails.logger.info('Kafka cluster stopped') | |
end | |
def self.docker_host | |
@docker_host ||= if ENV.include?('CIRCLECI') | |
`/sbin/ifconfig docker0 | grep "inet addr" | cut -d ':' -f 2 | cut -d ' ' -f 1`.strip | |
else | |
URI(ENV.fetch('DOCKER_HOST')).host | |
end | |
end | |
private | |
def download_images | |
[KAFKA_IMAGE, ZOOKEEPER_IMAGE].each do |image| | |
unless Docker::Image.exist?(image) | |
Rails.logger.info("Fetching image #{image}") | |
Docker::Image.create('fromImage' => image) | |
end | |
end | |
end | |
def start_zookeeper | |
@zookeeper_container = Docker::Container.create( | |
'Image' => ZOOKEEPER_IMAGE, | |
'Hostname' => 'localhost', | |
'ExposedPorts' => { | |
'2181/tcp' => {} | |
} | |
) | |
@zookeeper_container.start( | |
'PortBindings' => { | |
'2181/tcp' => [ | |
{ 'HostPort' => '' } | |
] | |
} | |
) | |
capture_container_logs(@zookeeper_container, ZOOKEEPER_LOG) | |
end | |
def start_kafka | |
@kafka_container = Docker::Container.create( | |
'Image' => KAFKA_IMAGE, | |
'Hostname' => 'localhost', | |
'Links' => ["#{zookeeper_container.id}:zookeeper"], | |
'ExposedPorts' => { | |
'9092/tcp' => {} | |
}, | |
'Env' => [ | |
'KAFKA_BROKER_ID=1', | |
"KAFKA_ADVERTISED_HOST_NAME=#{docker_host}", | |
'KAFKA_ADVERTISED_PORT=9093', | |
'KAFKA_DELETE_TOPIC_ENABLE=true' | |
] | |
) | |
@kafka_container.start( | |
'PortBindings' => { | |
'9092/tcp' => [ | |
{ 'HostPort' => '9093/tcp' } | |
] | |
} | |
) | |
capture_container_logs(@kafka_container, KAFKA_LOG) | |
wait_for_kafka_started | |
end | |
def capture_container_logs(container, log_file) | |
Thread.new do | |
File.open(log_file, 'a') do |log| | |
container.attach do |_stream, chunk| | |
log.print(chunk) | |
end | |
end | |
end | |
end | |
def run_kafka_command(*command) | |
Rails.logger.debug("Running command #{command}") | |
container = Docker::Container.create( | |
'Image' => KAFKA_IMAGE, | |
'Links' => ["#{zookeeper_container.id}:zookeeper"], | |
'Cmd' => command | |
) | |
container.start | |
status = container.wait.fetch('StatusCode') | |
output = container.logs(stdout: true, stderr: true) | |
Rails.logger.debug("Command exited with status=#{status}, output=\n#{output}") | |
unless status.zero? | |
raise CommandFailedError, "Kafka command #{command} failed with status #{status}:\n#{output}" | |
end | |
output | |
ensure | |
safe_delete_container(container) | |
end | |
def wait_for_kafka_started | |
Rails.logger.info('Waiting for Kafka to start') | |
Wait.for('Kafka started', max_wait_time: 45.seconds) do | |
begin | |
socket = TCPSocket.open(kafka_host, kafka_port) | |
socket.close | |
true | |
rescue | |
false | |
end | |
end | |
Rails.logger.info('Kafka to started') | |
end | |
def safe_delete_container(container) | |
container.delete(force: true) if container | |
rescue Docker::Error::ServerError | |
# This fails in Circle. See https://circleci.com/docs/docker-btrfs-error/ | |
raise unless ENV.include?('CIRCLECI') | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment