Last active
September 23, 2017 04:43
-
-
Save allenyang79/377de1d22072518b43af669e611c2a20 to your computer and use it in GitHub Desktop.
job/worker pattern of python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import time | |
import random | |
import logging | |
from multiprocessing import Process, Queue | |
#formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s|%(levelname)4s|%(processName)-10s| %(message)s') | |
logger = logging.getLogger() | |
def boss(queue): | |
"""Boss is responsible to assign job to each queue | |
""" | |
i = 0 | |
while True: | |
for i in range(0, 5): | |
if queue.full(): | |
logger.info('full') | |
else: | |
logger.info('assign task') | |
queue.put({ | |
'fn': task, | |
'args': ('task-%s' % i, ), | |
'kwargs': {} | |
}) | |
i += 1 | |
time.sleep(10) | |
def worker(worker_id, queue): | |
"""Worker will focus on specific queue to handle. | |
""" | |
while True: | |
logger.info("handle task: %s", worker_id) | |
job = queue.get() | |
job['fn'](*job['args'], **job['kwargs']) | |
def task(task_id): | |
logger.info("task: %s start", task_id) | |
for i in range(0, random.randint(5, 15)): | |
time.sleep(1) | |
logger.info("task: %s process", task_id) | |
logger.info("task: %s done", task_id) | |
def memory_usage_psutil(): | |
# return the memory usage in MB | |
import psutil | |
process = psutil.Process(os.getpid()) | |
mem = process.memory_info()[0] / float(2 ** 20) | |
return mem | |
def _ulimit(): | |
import resource | |
ulimit_list = [ | |
('RLIMIT_CORE', 'core file size'), | |
('RLIMIT_CPU', 'CPU time'), | |
('RLIMIT_FSIZE', 'file size'), | |
('RLIMIT_DATA', 'heap size'), | |
('RLIMIT_STACK', 'stack size'), | |
('RLIMIT_RSS', 'resident set size'), | |
('RLIMIT_NPROC', 'number of processes'), | |
('RLIMIT_NOFILE', 'number of open files'), | |
('RLIMIT_MEMLOCK', 'lockable memory address'), | |
] | |
for name, desc in ulimit_list: | |
limit_num = getattr(resource, name) | |
soft, hard = resource.getrlimit(limit_num) | |
try: | |
resource.setrlimit(resource.RLIMIT_NOFILE, (400000, 400000)) | |
except Exception as e: | |
logger.error('unable to setrlimit: e: %s', e) | |
for name, desc in ulimit_list: | |
limit_num = getattr(resource, name) | |
soft, hard = resource.getrlimit(limit_num) | |
logger.warning('after setrlimit: Maximum %-25s (%-15s) : %20s %20s', desc, name, soft, hard) | |
def main(): | |
queue = Queue(5) | |
# boss | |
Process(target=boss, args=(queue,)).start() | |
# worker | |
worker_id = 'worker-01' | |
Process(target=worker, args=(worker_id, queue,)).start() | |
worker_id = 'worker-02' | |
Process(target=worker, args=(worker_id, queue,)).start() | |
worker_id = 'worker-03' | |
Process(target=worker, args=(worker_id, queue,)).start() | |
watchdog() | |
def watchdog(): | |
while True: | |
time.sleep(60) | |
#print memory_usage_psutil() | |
_ulimit() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment