Created
June 3, 2016 20:22
-
-
Save virtuald/d7416e06d0d348af30aa1425fb67a5b6 to your computer and use it in GitHub Desktop.
NSQ performance tester
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Run this in three different terminal windows (presumes download of NSQD): | |
# - rm *.dat && ./nsqd | |
# - ./raw_perf.py read | |
# - ./raw_perf.py write | |
# | |
from __future__ import print_function | |
import os | |
import sys | |
import uuid | |
import time | |
from nsq import Reader, Writer | |
from tornado.ioloop import IOLoop | |
if False: | |
# Docker containers with NSQ | |
nsqd_addresses = ["172.17.0.4:4150"] | |
nsqlookupd_addresses = ["172.17.0.2:4161"] | |
else: | |
# Local instance of NSQD | |
nsqd_addresses = ['127.0.0.1:4150'] | |
nsqlookupd_addresses = [] | |
CHANNEL = "test.channel" | |
# Change this to enable/disable profiling | |
PROFILE = False | |
import logging | |
logger = logging.getLogger('perf') | |
class Test(object): | |
def __init__(self): | |
self.messages = 0 | |
self.start = None | |
self.writer = Writer(nsqd_addresses) | |
def begin_profile(self): | |
import cProfile | |
self.profiler = cProfile.Profile() | |
self.profiler.enable() | |
def end_profile(self): | |
self.profiler.disable() | |
self.profiler.dump_stats("raw_perf.prof") | |
def init_reader(self): | |
# Causes the channel to exist before we create a reader for it.. | |
self.writer.pub(CHANNEL, 'init') | |
queue_name = str(uuid.uuid4()).replace("-", "") #+ '#ephemeral' | |
self.reader = Reader(message_handler=self.on_message, | |
nsqd_tcp_addresses=nsqd_addresses, | |
topic=CHANNEL, | |
channel=queue_name, | |
max_in_flight=200) | |
def publish(self): | |
data = os.urandom(367) | |
for i in xrange(50001): | |
self.writer.pub(CHANNEL, data) | |
print("Done") | |
def on_message(self, message): | |
message.finish() | |
if message.body == 'init': | |
return True | |
if not self.start: | |
self.start = time.time() | |
# simulate a load... | |
#time.sleep(0.0001) | |
if self.messages % 1000 == 0: | |
logger.info("Received: %s", self.messages) | |
if self.messages == 50000: | |
logger.info("Got %s in %.3f seconds", self.messages, time.time() - self.start) | |
IOLoop.current().stop() | |
self.messages += 1 | |
return True | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
test = Test() | |
if PROFILE: | |
IOLoop.current().add_callback(test.begin_profile) | |
if sys.argv[1] == 'read': | |
test.init_reader() | |
else: | |
IOLoop.current().call_later(0.5, test.publish) | |
IOLoop.current().start() | |
if PROFILE: | |
test.end_profile() | |
logger.info("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment