Created
May 25, 2020 10:15
-
-
Save Lh4cKg/4d80d697f4b376a1161fb3965d084909 to your computer and use it in GitHub Desktop.
Python Multiprocessing Workers Architecture
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.managers import BaseManager | |
class QueueManager(BaseManager): | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import json | |
import logging | |
from .workers import Worker | |
logger = logging.getLogger(__name__) | |
class BaseProcess(Worker): | |
def __init__(self, manager): | |
super().__init__(manager) | |
logger.info('---- START WORKER ----') | |
def df_processing(self, *args, **kwargs): | |
raise NotImplementedError | |
def execute(self): | |
while True: | |
logger.info(f'QUEUE SIZE: {self.queue.qsize()}') | |
value = self.queue.get() | |
if value: | |
s = time.time() | |
try: | |
self.df_processing(value) | |
logger.info(f'QUEUE EXECUTION TIME: {time.time()-s}') | |
except Exception as e: | |
logger.exception(e) | |
else: | |
time.sleep(0.1) | |
class YourProcessClass(BaseProcess): | |
def df_processing(self, value, **kwargs): | |
# write logic here |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from . import settings | |
from .managers import QueueManager | |
logger = logging.getLogger(__name__) | |
def get_queue(worker_host, worker_port, auth_key): | |
""" | |
:param worker_host: | |
:param worker_port: | |
:param auth_key: | |
:return: | |
""" | |
QueueManager.register('get_queue') | |
global_manager = QueueManager( | |
address=(worker_host, worker_port), | |
authkey=auth_key | |
) | |
global_manager.connect() | |
return global_manager.get_queue() | |
def serve_queue(): | |
""" | |
:return: | |
""" | |
manager = QueueManager( | |
address=( | |
settings.WORKER_ADDRESS_HOST, settings.WORKER_ADDRESS_PORT | |
), | |
authkey=settings.WORKER_AUTH_KEY | |
) | |
server = manager.get_server() | |
logger.info('---- RUNNING SERVER ----') | |
server.serve_forever() | |
logger.info('---- END SERVER ----') | |
def get_server_manager(): | |
QueueManager.register('get_queue') | |
manager = QueueManager( | |
address=( | |
settings.WORKER_ADDRESS_HOST, settings.WORKER_ADDRESS_PORT | |
), | |
authkey=settings.WORKER_AUTH_KEY | |
) | |
manager.connect() | |
return manager |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import multiprocessing as mp | |
from queue import Queue | |
from .managers import QueueManager | |
from .queues import serve_queue, get_server_manager | |
from .workers import init_workers, spawning | |
logger = logging.getLogger(__name__) | |
class Register: | |
def __init__(self, register=None): | |
if register: | |
if callable(register): | |
register() | |
else: | |
logger.info('Register is not callable') | |
self.default_register() | |
else: | |
self.default_register() | |
@staticmethod | |
def default_register(): | |
""" | |
:return: | |
""" | |
queue = Queue() | |
QueueManager.register('get_queue', lambda: queue) | |
mp.Process(target=serve_queue, daemon=True).start() | |
time.sleep(1) | |
kw = {'manager': get_server_manager()} | |
init_workers(**kw) | |
while True: | |
spawning() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PROCESS_CLASS = 'module_name.process.YourProcessClass' | |
NUMBER_OF_WORKERS = 10 | |
WORKER_ADDRESS_HOST = '127.0.0.1' | |
WORKER_ADDRESS_PORT = 50000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import logging | |
import multiprocessing as mp | |
from ctypes import c_int64 | |
from . import settings | |
from .utils import import_string | |
logger = logging.getLogger(__name__) | |
class Worker: | |
def __init__(self, manager): | |
self.manager = manager | |
self.queue = self.manager.get_queue() | |
def execute(self): | |
raise NotImplementedError | |
def start(self): | |
self.execute() | |
def start_process(**kw): | |
matching = import_string(settings.PROCESS_CLASS)(manager=kw['manager']) | |
mp.Process(target=matching.start, daemon=True).start() | |
def spawning(**kw): | |
pid, status = os.waitpid(-1, os.WNOHANG) | |
if pid: | |
start_process(**kw) | |
else: | |
time.sleep(0.1) | |
def init_workers(**kw): | |
logger.info(f'Number Of Workers: {settings.NUMBER_OF_WORKERS}') | |
for _ in range(settings.NUMBER_OF_WORKERS): | |
start_process(**kw) | |
def create_proxy_objects(m): | |
# global current_time | |
current_time = time.time() | |
return dict(current_time=m.Value(c_int64, current_time)) | |
# if __name__ == '__main__': | |
# queue = Queue() | |
# QueueManager.register('get_queue', lambda: queue) | |
# mp.Process(target=serve_queue, daemon=True).start() | |
# time.sleep(1) | |
# kw = {'manager': get_server_manager()} | |
# init_workers(**kw) | |
# while True: | |
# spawning() | |
# workers_register() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment