Created
May 23, 2018 10:16
-
-
Save dexterbt1/e5c49a04abdd0d54e04ff73b53d22204 to your computer and use it in GitHub Desktop.
stomp worker pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import uuid | |
import threading | |
import stomp | |
class BcastMTWorkerListener(stomp.ConnectionListener): | |
def __init__(self, conn, receipt_waiting): | |
self.conn = conn | |
self.receipt_waiting = receipt_waiting | |
# --- | |
self.tx_id = None | |
self.tx_receipt = None | |
def on_error(self, headers, message): | |
print('%s: error "%s"' % (threading.current_thread(), message)) | |
sys.exit(1) | |
# logger.error | |
def on_message(self, message_headers, message): | |
print('%s: on_message start ... got message "%s"' % (threading.current_thread(), message)) | |
# logger.debug | |
self.tx_cond = threading.Condition() | |
self.tx_receipt = 'commit-' + str(uuid.uuid4()) | |
self.tx_id = self.conn.begin() | |
# TODO: FIXME: do translations here ... | |
out_message = message | |
self.conn.send('/queue/hello-reply', | |
out_message, | |
content_type='application/octet-stream', | |
headers={'persistent': 'true'}) | |
self.conn.ack( | |
message_headers['message-id'], | |
message_headers['subscription']) | |
self.conn.commit(self.tx_id, headers={'receipt': self.tx_receipt}) | |
with self.receipt_waiting: | |
self.receipt_waiting.notify() | |
print('%s: on_message done' % threading.current_thread()) | |
# logger.info('successfully ack+sent ....') | |
# --- | |
def on_receipt(self, headers, body): | |
if 'receipt-id' in headers and headers['receipt-id'] == self.tx_receipt: | |
with self.tx_cond: | |
self.tx_cond.notify() | |
def wait_on_tx_receipt(self): | |
with self.tx_cond: | |
self.tx_cond.wait() | |
def loop(self, receipt_waiting_cond): | |
while True: | |
# print('%s: active threads: %s' % (threading.current_thread(), threading.active_count())) | |
with receipt_waiting_cond: | |
receipt_waiting_cond.wait() | |
print("%s: waiting for tx_receipt ..." % threading.current_thread()) | |
self.wait_on_tx_receipt() | |
print("%s: got tx commit receipt." % threading.current_thread()) | |
# -- main | |
receipt_waiting_cond = threading.Condition() | |
conn = stomp.Connection([('localhost', '61613')], vhost='bcast') | |
worker = BcastMTWorkerListener(conn, receipt_waiting_cond) | |
conn.set_listener('', worker) | |
conn.start() | |
conn.connect('bcast', 'bcast', wait=True) | |
conn.subscribe(destination='/queue/hello', | |
ack='client-individual', | |
id='subscription-' + str(uuid.uuid4()), | |
headers={ | |
'prefetch-size': '1', | |
'persistent': 'true', | |
}) | |
worker.loop(receipt_waiting_cond) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment