Created
June 12, 2019 22:54
-
-
Save jayrbolton/7fbef5cc990257376a4f45cf84c9d741 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zmq | |
import atexit | |
import logging | |
from zmq.devices import ProcessDevice | |
from zmq.log.handlers import PUBHandler | |
from multiprocessing import Process | |
# All child processes we create | |
procs = [] # type: list | |
# Socket path for communication between the http server and job runner | |
_SOCKET_PATH = '/tmp/jobrunner' | |
def main(): | |
"""Parent process that launches http server and job runner.""" | |
server_port = 5559 | |
job_runner_port = 5560 | |
log_port = 5561 | |
# Track processes we start for cleanup | |
procs = [] # type: list | |
def cleanup(): | |
print("Shutting down child processes..") | |
for p in procs: | |
p.kill() | |
atexit.register(cleanup) | |
num_workers = 2 # number of job runners | |
# Message queue between services | |
queue_device = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ) | |
queue_device.bind_in(f'ipc://{_SOCKET_PATH}:{server_port}') # requests | |
queue_device.bind_out(f'ipc://{_SOCKET_PATH}:{job_runner_port}') # replies | |
queue_device.start() | |
# Start the HTTP server | |
server_proc = Process(target=server, args=(server_port, log_port)) | |
procs.append(server_proc) | |
server_proc.start() | |
# Start the job runners | |
for _ in range(num_workers): | |
proc = Process(target=job_runner, args=(job_runner_port, log_port)) | |
procs.append(proc) | |
proc.start() | |
# Listen to the log socket | |
log_listener(log_port) | |
def job_runner(port, log_port): | |
"""Mock job runner process.""" | |
logger = _get_logger(log_port) | |
context = zmq.Context() | |
socket = context.socket(zmq.REP) # send replies to callback server | |
socket.connect(f"ipc:///{_SOCKET_PATH}:{port}") | |
while True: | |
request = socket.recv() # block for request messages | |
logger.info(f"Recieved request from http server: {request}") | |
# Do some work | |
time.sleep(5) | |
# Send a response | |
socket.send_string("Response from the job runner goes here.") | |
def server(port, log_port): | |
"""Mock HTTP server process.""" | |
logger = _get_logger(log_port) | |
context = zmq.Context() | |
socket = context.socket(zmq.REQ) # send requests to job runner | |
socket.connect(f'ipc:///{_SOCKET_PATH}:{port}') | |
while True: | |
# Send a message to the job runner | |
socket.send_string("Request from client (eg. start job, status, etc)") | |
logger.info('Sent string to job runner') | |
reply = socket.recv() # blocks | |
logger.info(f"Received reply from job runner: {reply}") | |
def _get_logger(port): | |
# Some basic inter-node logging | |
# Publishes python `logging` messages over tcp to one or more subscribers | |
context = zmq.Context() | |
pub = context.socket(zmq.PUB) | |
pub.connect('tcp://127.0.0.1:' + str(port)) | |
handler = PUBHandler(pub) | |
handler.root_topic = 'example' | |
logger = logging.getLogger('example') | |
logger.setLevel(logging.INFO) | |
logger.addHandler(handler) | |
return logger | |
def log_listener(port): | |
"""Print all messages to the logging socket to stdout.""" | |
context = zmq.Context() | |
sub = context.socket(zmq.SUB) | |
sub.setsockopt_string(zmq.SUBSCRIBE, 'example') | |
sub.bind(f'tcp://127.0.0.1:{port}') | |
while True: | |
print(sub.recv_multipart()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment