Skip to content

Instantly share code, notes, and snippets.

@jayrbolton
Created June 12, 2019 22:54
Show Gist options
  • Save jayrbolton/7fbef5cc990257376a4f45cf84c9d741 to your computer and use it in GitHub Desktop.
Save jayrbolton/7fbef5cc990257376a4f45cf84c9d741 to your computer and use it in GitHub Desktop.
import time
import zmq
import atexit
import logging
from zmq.devices import ProcessDevice
from zmq.log.handlers import PUBHandler
from multiprocessing import Process
# All child processes we create
procs = [] # type: list
# Socket path for communication between the http server and job runner
_SOCKET_PATH = '/tmp/jobrunner'
def main():
"""Parent process that launches http server and job runner."""
server_port = 5559
job_runner_port = 5560
log_port = 5561
# Track processes we start for cleanup
procs = [] # type: list
def cleanup():
print("Shutting down child processes..")
for p in procs:
p.kill()
atexit.register(cleanup)
num_workers = 2 # number of job runners
# Message queue between services
queue_device = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ)
queue_device.bind_in(f'ipc://{_SOCKET_PATH}:{server_port}') # requests
queue_device.bind_out(f'ipc://{_SOCKET_PATH}:{job_runner_port}') # replies
queue_device.start()
# Start the HTTP server
server_proc = Process(target=server, args=(server_port, log_port))
procs.append(server_proc)
server_proc.start()
# Start the job runners
for _ in range(num_workers):
proc = Process(target=job_runner, args=(job_runner_port, log_port))
procs.append(proc)
proc.start()
# Listen to the log socket
log_listener(log_port)
def job_runner(port, log_port):
"""Mock job runner process."""
logger = _get_logger(log_port)
context = zmq.Context()
socket = context.socket(zmq.REP) # send replies to callback server
socket.connect(f"ipc:///{_SOCKET_PATH}:{port}")
while True:
request = socket.recv() # block for request messages
logger.info(f"Recieved request from http server: {request}")
# Do some work
time.sleep(5)
# Send a response
socket.send_string("Response from the job runner goes here.")
def server(port, log_port):
"""Mock HTTP server process."""
logger = _get_logger(log_port)
context = zmq.Context()
socket = context.socket(zmq.REQ) # send requests to job runner
socket.connect(f'ipc:///{_SOCKET_PATH}:{port}')
while True:
# Send a message to the job runner
socket.send_string("Request from client (eg. start job, status, etc)")
logger.info('Sent string to job runner')
reply = socket.recv() # blocks
logger.info(f"Received reply from job runner: {reply}")
def _get_logger(port):
# Some basic inter-node logging
# Publishes python `logging` messages over tcp to one or more subscribers
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.connect('tcp://127.0.0.1:' + str(port))
handler = PUBHandler(pub)
handler.root_topic = 'example'
logger = logging.getLogger('example')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return logger
def log_listener(port):
"""Print all messages to the logging socket to stdout."""
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, 'example')
sub.bind(f'tcp://127.0.0.1:{port}')
while True:
print(sub.recv_multipart())
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment