Created
February 25, 2017 22:26
-
-
Save jmorton/333bf9d648526e55b07300aee9d592f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
import msgpack | |
import logging | |
import sys | |
from . import spark | |
from .app import logger | |
logger = logging.getLogger(__name__) | |
class MessagingException(Exception): | |
pass | |
def open_connection(host, port, ssl): | |
"""Explain""" | |
opts = pika.ConnectionParameters(host, port, ssl=ssl) | |
return pika.BlockingConnection(opts) | |
def close_connection(conn): | |
"""Explain""" | |
if conn is not None and conn.is_open: | |
conn.close() | |
return True | |
def open_channel(conn): | |
"""Explain""" | |
channel = conn.channel() | |
channel.basic_qos(prefetch_count=1) | |
return channel | |
def consume(channel, queue, handler): | |
"""Explain""" | |
return channel.basic_consume(handler, queue=queue, exclusive=False) | |
def decode(request): | |
"""Convert keys and values unpacked as bytes to strings.""" | |
body = msgpack.unpackb(request) | |
out = dict() | |
for k, v in body.items(): | |
out_k, out_v = k, v | |
if isinstance(k, bytes): | |
out_k = k.decode('utf-8') | |
if isinstance(v, bytes): | |
out_v = v.decode('utf-8') | |
out[out_k] = out_v | |
return out | |
def make_receiver(handle, sender): | |
"""Produce a fuction that handles messages.""" | |
def receiver(ch, method, properties, body): | |
[send(ch, result) for result in handle(ch, decode(message))] | |
ch.basic_ack(method) | |
return receiver | |
def encode(result): | |
"""Prepare a result for messaging.""" | |
return msgpack.packb(result) | |
def make_sender(exchange, routing_key): | |
"""Produce a function that sends results via a channel.""" | |
properties = pika.BasicProperties(delivery_mode=2) | |
def sender(ch, result): | |
message = encode(result) | |
return ch.basic_publish(exchange=exchange, | |
routing_key=routing_key, | |
body=message, | |
properties=properties) | |
return sender | |
def worker(host, port, ssl, queue, exchange, routing_key, handler): | |
"""Create and start a consumer.""" | |
try: | |
conn = open_connection(host, port, ssl) | |
chan = open_channel(conn) | |
sender = make_sender(exchange, routing_key) | |
receiver = make_receiver(handler, sender) | |
worker = consume(chan, queue, receiver).start_consuming() | |
return worker | |
except Exception as e: | |
logger.error(e) | |
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def run(cfg): | |
"""Start a worker.""" | |
host = cfg['rabbit-host'] | |
port = cfg['rabbit-port'], | |
ssl = cfg['rabbit-ssl']) | |
queue = cfg['rabbit-queue'] | |
exchange = cfg['rabbit-exchange'] | |
key = cfg['rabbit-result-routing-key'] | |
return worker(host, port, ssl, queue, exchange, key, spark.run) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment