Skip to content

Instantly share code, notes, and snippets.

@virtuald
Created June 3, 2016 20:22
Show Gist options
  • Save virtuald/d7416e06d0d348af30aa1425fb67a5b6 to your computer and use it in GitHub Desktop.
Save virtuald/d7416e06d0d348af30aa1425fb67a5b6 to your computer and use it in GitHub Desktop.
NSQ performance tester
#!/usr/bin/env python
#
# Run this in three different terminal windows (presumes download of NSQD):
# - rm *.dat && ./nsqd
# - ./raw_perf.py read
# - ./raw_perf.py write
#
from __future__ import print_function
import os
import sys
import uuid
import time
from nsq import Reader, Writer
from tornado.ioloop import IOLoop
if False:
# Docker containers with NSQ
nsqd_addresses = ["172.17.0.4:4150"]
nsqlookupd_addresses = ["172.17.0.2:4161"]
else:
# Local instance of NSQD
nsqd_addresses = ['127.0.0.1:4150']
nsqlookupd_addresses = []
CHANNEL = "test.channel"
# Change this to enable/disable profiling
PROFILE = False
import logging
logger = logging.getLogger('perf')
class Test(object):
def __init__(self):
self.messages = 0
self.start = None
self.writer = Writer(nsqd_addresses)
def begin_profile(self):
import cProfile
self.profiler = cProfile.Profile()
self.profiler.enable()
def end_profile(self):
self.profiler.disable()
self.profiler.dump_stats("raw_perf.prof")
def init_reader(self):
# Causes the channel to exist before we create a reader for it..
self.writer.pub(CHANNEL, 'init')
queue_name = str(uuid.uuid4()).replace("-", "") #+ '#ephemeral'
self.reader = Reader(message_handler=self.on_message,
nsqd_tcp_addresses=nsqd_addresses,
topic=CHANNEL,
channel=queue_name,
max_in_flight=200)
def publish(self):
data = os.urandom(367)
for i in xrange(50001):
self.writer.pub(CHANNEL, data)
print("Done")
def on_message(self, message):
message.finish()
if message.body == 'init':
return True
if not self.start:
self.start = time.time()
# simulate a load...
#time.sleep(0.0001)
if self.messages % 1000 == 0:
logger.info("Received: %s", self.messages)
if self.messages == 50000:
logger.info("Got %s in %.3f seconds", self.messages, time.time() - self.start)
IOLoop.current().stop()
self.messages += 1
return True
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
test = Test()
if PROFILE:
IOLoop.current().add_callback(test.begin_profile)
if sys.argv[1] == 'read':
test.init_reader()
else:
IOLoop.current().call_later(0.5, test.publish)
IOLoop.current().start()
if PROFILE:
test.end_profile()
logger.info("Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment