Last active
December 16, 2015 12:40
-
-
Save chmduquesne/5d730dcf297d064abde5 to your computer and use it in GitHub Desktop.
Testing the read ahead feature of mq in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import pymqi | |
import pymqi.CMQC as CMQC | |
import time | |
import sys | |
import os | |
from os import getenv | |
import signal | |
import logging | |
import logging.handlers | |
import math | |
logger = logging.getLogger(os.path.basename(__file__)) | |
logger.setLevel(logging.INFO) | |
handler_stderr = logging.StreamHandler() | |
logger.addHandler(handler_stderr) | |
log = logger.info | |
class Stats: | |
""" | |
Helper class for showing statistics. | |
Usage: | |
>>> s = Stats() | |
>>> s.update("queue", 2048) | |
""" | |
def __init__(self): | |
self.last_shown = int(time.time()) | |
self.show_every = 5 | |
self.infos = {} | |
self.total_size = 0.0 | |
def size_fmt(self, nbytes, suffix='B'): | |
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: | |
if abs(nbytes) < 1024.0: | |
return "%3.1f %s%s" % (nbytes, unit, suffix) | |
nbytes /= 1024.0 | |
return "%.1f%s%s" % (nbytes, 'Yi', suffix) | |
def update(self, topic, size): | |
if topic not in self.infos: | |
self.infos[topic] = {"messages": 0, "size": 0} | |
self.infos[topic]["messages"] += 1 | |
self.infos[topic]["size"] += size | |
t = int(time.time()) | |
seconds_since_last_shown = t - self.last_shown | |
if seconds_since_last_shown >= self.show_every: | |
self.show(seconds_since_last_shown) | |
self.last_shown = t | |
def summary(self, duration, messages, size): | |
tps = float(messages) / duration | |
throughput = self.size_fmt(float(size) / duration) | |
avg_size = self.size_fmt(float(size) / messages) | |
return "%.1f tps, %s/s, %s/msg" % (tps, throughput, avg_size) | |
def show(self, duration): | |
for topic, infos in self.infos.items(): | |
log("%s: Got %d messages (%s)" % (topic, infos["messages"], | |
self.summary(duration, infos["messages"], infos["size"]))) | |
self.infos = {} | |
def main(): | |
log("Starting.") | |
QMGR_HOST = getenv("QMGR_HOST") | |
QMGR_NAME = getenv("QMGR_NAME") | |
QMGR_CHANNEL = getenv("QMGR_CHANNEL") | |
SSL_CIPHER_SPEC = getenv("SSL_CIPHER_SPEC") | |
SSL_KEY_REPO = getenv("SSL_KEY_REPO") | |
queue_name = getenv("QUEUE_NAME") | |
read_ahead = bool(getenv("READ_AHEAD")) | |
cd = pymqi.CD() | |
cd.ChannelName = QMGR_CHANNEL | |
cd.ConnectionName = QMGR_HOST | |
cd.ChannelType = CMQC.MQCHT_CLNTCONN | |
cd.TransportType = CMQC.MQXPT_TCP | |
cd.SSLCipherSpec = SSL_CIPHER_SPEC | |
sco = pymqi.SCO() | |
sco.KeyRepository = SSL_KEY_REPO | |
qmgr = pymqi.QueueManager(None) | |
qmgr.connect_with_options(QMGR_NAME, cd, sco) | |
od = pymqi.OD() | |
od.ObjectName = queue_name | |
if read_ahead: | |
log("Using read_ahead") | |
queue = pymqi.Queue( | |
qmgr, | |
od, | |
CMQC.MQOO_INPUT_AS_Q_DEF | | |
CMQC.MQOO_READ_AHEAD | |
) | |
else: | |
log("Not using read_ahead") | |
queue = pymqi.Queue( | |
qmgr, | |
od, | |
CMQC.MQOO_INPUT_AS_Q_DEF | |
) | |
stats = Stats() | |
n = 0 | |
try: | |
while True: | |
md = pymqi.MD() | |
gmo = pymqi.GMO() | |
gmo.Options = CMQC.MQGMO_WAIT | \ | |
CMQC.MQGMO_FAIL_IF_QUIESCING | |
message = queue.get(None, md, gmo) | |
stats.update(queue_name, len(message)) | |
n+=1 | |
except pymqi.MQMIError as e: | |
if e.reason != 2033: # Empty queue | |
raise | |
finally: | |
log("Got a total of %d messages" % n) | |
qmgr.disconnect() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I launch this with no argument.