Skip to content

Instantly share code, notes, and snippets.

@sheeley
Created August 6, 2015 00:02
Show Gist options
  • Save sheeley/f2c718efb643add1ba3f to your computer and use it in GitHub Desktop.
Save sheeley/f2c718efb643add1ba3f to your computer and use it in GitHub Desktop.
MirrorMaker Partition Test
This test creates keyed messages and pushes them to one Kafka broker (source).
Mirror maker is consuming from source and pushing to destination.
After publishing, the test consumes first from source, then destination, and compares the counts per partition.
Output:
running with topic EQEVYCTWOV
publishing 10000 messages to EQEVYCTWOV
partition source destination match
0 833 1666 FALSE
1 1667 2501 FALSE
2 1668 1667 FALSE
3 1667 1666 FALSE
4 2499 833 FALSE
5 1666 1667 FALSE
total 10000 10000
running with topic KMGXKOZHRG
publishing 100 messages to KMGXKOZHRG
partition source destination match
0 8 16 FALSE
1 17 26 FALSE
2 18 17 FALSE
3 17 16 FALSE
4 24 8 FALSE
5 16 17 FALSE
total 100 100
running with topic MJKSFIRPUS
publishing 40000 messages to MJKSFIRPUS
partition source destination match
0 3333 6666 FALSE
1 6667 10001 FALSE
2 6668 6667 FALSE
3 6667 6666 FALSE
4 9999 3333 FALSE
5 6666 6667 FALSE
total 40000 40000
from kafka import KeyedProducer, KafkaClient, KafkaConsumer
from kafka.partitioner import Murmur2Partitioner
from collections import defaultdict
from interruptingcow import timeout
from time import sleep
import random
import string
SRC_BROKERS = ['localhost:9092']
DEST_BROKERS = ['localhost:9093']
TIMEOUT = 10
# Note that the application is responsible for encoding messages to type bytes
KEYS = [
"11111111-2222-1111-1111-111111111111",
"067e6162-3b6f-4ae2-a171-2470b63dff00",
"057f6162-3a6d-4ae2-a171-2470b63dee22",
"11111111-3333-3333-3333-111111111111",
"057f6162-3a6d-4ae2-a171-2470b63dee00",
"057f6152-3b6f-4ae2-a171-2470b63dee99",
"067e6162-3b6f-4ae2-a171-2470b63dff99",
"11111111-3333-7777-7777-111111111111",
"00000000-0001-8888-8888-888888888888",
"057f6162-abcd-4ae2-a171-2470b63dee11",
"22226162-eeee-4ae2-a171-2470b6222222",
"22226162-111e-4ae2-a171-2470b6222222"]
class Timeout(Exception):
pass
def create_topic(topic, clients):
for c in clients:
# print "creating topic %s on %s kafka" % (topic, c.hosts)
c.ensure_topic_exists(topic)
sleep(5)
def publish_messages(topic, src_client, message_count=10000):
print "publishing %d messages to %s" % (message_count, topic)
producer = KeyedProducer(src_client, partitioner=Murmur2Partitioner)
for i in range(0, message_count):
key = KEYS[i % 12]
producer.send_messages(topic, key, 'some message %d' % i)
def summarize(consumer, message_count):
count = 0
consumer_partitions = defaultdict(int)
try:
with timeout(TIMEOUT, exception=Timeout):
for message in consumer:
count += 1
consumer_partitions[message.partition] += 1
if count == message_count:
break
except Timeout:
pass
return consumer_partitions
def consume_messages(topic, src_brokers, dest_brokers, message_count):
messages = []
idx = 0
for brokers in [src_brokers, dest_brokers]:
# print "consuming messages from %s" % brokers
consumer = KafkaConsumer(topic,
group_id=topic + '-test',
bootstrap_servers=brokers,
auto_offset_reset='smallest')
messages.append(summarize(consumer, message_count))
idx += 1
print_comparison(messages)
return messages
def print_comparison(messages):
src = dict(messages[0])
dest = dict(messages[1])
src_total = sum(src.values())
dest_total = sum(dest.values())
partitions = set(src.keys() + dest.keys())
outputs = []
for k in partitions:
s_v = src.get(k, 0)
d_v = dest.get(k, 0)
match = "" if s_v == d_v else "FALSE"
outputs.append("{0} {1:21} {2:16} {3}".format(k, s_v, d_v, match))
print "partition source destination match"
print '\n'.join(outputs)
print "total {0:17} {1:16}".format(src_total, dest_total)
print "\n"
def run_full_pass(topic, src_brokers, dest_brokers, message_count=10000):
print "running with topic %s" % topic
src_client = KafkaClient(src_brokers)
dest_client = KafkaClient(dest_brokers)
create_topic(topic, [src_client, dest_client])
publish_messages(topic, src_client, message_count)
return consume_messages(topic, src_brokers, dest_brokers, message_count)
def generate_topic():
return ''.join([random.choice(string.ascii_uppercase) for _ in range(10)])
if __name__ == '__main__':
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS)
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS, 100)
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS, 40000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment