Created
August 6, 2015 00:02
-
-
Save sheeley/f2c718efb643add1ba3f to your computer and use it in GitHub Desktop.
MirrorMaker Partition Test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This test creates keyed messages and pushes them to one Kafka broker (source). | |
Mirror maker is consuming from source and pushing to destination. | |
After publishing, the test consumes first from source, then destination, and compares the counts per partition. | |
Output: | |
running with topic EQEVYCTWOV | |
publishing 10000 messages to EQEVYCTWOV | |
partition source destination match | |
0 833 1666 FALSE | |
1 1667 2501 FALSE | |
2 1668 1667 FALSE | |
3 1667 1666 FALSE | |
4 2499 833 FALSE | |
5 1666 1667 FALSE | |
total 10000 10000 | |
running with topic KMGXKOZHRG | |
publishing 100 messages to KMGXKOZHRG | |
partition source destination match | |
0 8 16 FALSE | |
1 17 26 FALSE | |
2 18 17 FALSE | |
3 17 16 FALSE | |
4 24 8 FALSE | |
5 16 17 FALSE | |
total 100 100 | |
running with topic MJKSFIRPUS | |
publishing 40000 messages to MJKSFIRPUS | |
partition source destination match | |
0 3333 6666 FALSE | |
1 6667 10001 FALSE | |
2 6668 6667 FALSE | |
3 6667 6666 FALSE | |
4 9999 3333 FALSE | |
5 6666 6667 FALSE | |
total 40000 40000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KeyedProducer, KafkaClient, KafkaConsumer | |
from kafka.partitioner import Murmur2Partitioner | |
from collections import defaultdict | |
from interruptingcow import timeout | |
from time import sleep | |
import random | |
import string | |
SRC_BROKERS = ['localhost:9092'] | |
DEST_BROKERS = ['localhost:9093'] | |
TIMEOUT = 10 | |
# Note that the application is responsible for encoding messages to type bytes | |
KEYS = [ | |
"11111111-2222-1111-1111-111111111111", | |
"067e6162-3b6f-4ae2-a171-2470b63dff00", | |
"057f6162-3a6d-4ae2-a171-2470b63dee22", | |
"11111111-3333-3333-3333-111111111111", | |
"057f6162-3a6d-4ae2-a171-2470b63dee00", | |
"057f6152-3b6f-4ae2-a171-2470b63dee99", | |
"067e6162-3b6f-4ae2-a171-2470b63dff99", | |
"11111111-3333-7777-7777-111111111111", | |
"00000000-0001-8888-8888-888888888888", | |
"057f6162-abcd-4ae2-a171-2470b63dee11", | |
"22226162-eeee-4ae2-a171-2470b6222222", | |
"22226162-111e-4ae2-a171-2470b6222222"] | |
class Timeout(Exception): | |
pass | |
def create_topic(topic, clients): | |
for c in clients: | |
# print "creating topic %s on %s kafka" % (topic, c.hosts) | |
c.ensure_topic_exists(topic) | |
sleep(5) | |
def publish_messages(topic, src_client, message_count=10000): | |
print "publishing %d messages to %s" % (message_count, topic) | |
producer = KeyedProducer(src_client, partitioner=Murmur2Partitioner) | |
for i in range(0, message_count): | |
key = KEYS[i % 12] | |
producer.send_messages(topic, key, 'some message %d' % i) | |
def summarize(consumer, message_count): | |
count = 0 | |
consumer_partitions = defaultdict(int) | |
try: | |
with timeout(TIMEOUT, exception=Timeout): | |
for message in consumer: | |
count += 1 | |
consumer_partitions[message.partition] += 1 | |
if count == message_count: | |
break | |
except Timeout: | |
pass | |
return consumer_partitions | |
def consume_messages(topic, src_brokers, dest_brokers, message_count): | |
messages = [] | |
idx = 0 | |
for brokers in [src_brokers, dest_brokers]: | |
# print "consuming messages from %s" % brokers | |
consumer = KafkaConsumer(topic, | |
group_id=topic + '-test', | |
bootstrap_servers=brokers, | |
auto_offset_reset='smallest') | |
messages.append(summarize(consumer, message_count)) | |
idx += 1 | |
print_comparison(messages) | |
return messages | |
def print_comparison(messages): | |
src = dict(messages[0]) | |
dest = dict(messages[1]) | |
src_total = sum(src.values()) | |
dest_total = sum(dest.values()) | |
partitions = set(src.keys() + dest.keys()) | |
outputs = [] | |
for k in partitions: | |
s_v = src.get(k, 0) | |
d_v = dest.get(k, 0) | |
match = "" if s_v == d_v else "FALSE" | |
outputs.append("{0} {1:21} {2:16} {3}".format(k, s_v, d_v, match)) | |
print "partition source destination match" | |
print '\n'.join(outputs) | |
print "total {0:17} {1:16}".format(src_total, dest_total) | |
print "\n" | |
def run_full_pass(topic, src_brokers, dest_brokers, message_count=10000): | |
print "running with topic %s" % topic | |
src_client = KafkaClient(src_brokers) | |
dest_client = KafkaClient(dest_brokers) | |
create_topic(topic, [src_client, dest_client]) | |
publish_messages(topic, src_client, message_count) | |
return consume_messages(topic, src_brokers, dest_brokers, message_count) | |
def generate_topic(): | |
return ''.join([random.choice(string.ascii_uppercase) for _ in range(10)]) | |
if __name__ == '__main__': | |
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS) | |
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS, 100) | |
run_full_pass(generate_topic(), SRC_BROKERS, DEST_BROKERS, 40000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment