Skip to content

Instantly share code, notes, and snippets.

@allenyang79
Last active September 23, 2017 04:43
Show Gist options
  • Save allenyang79/377de1d22072518b43af669e611c2a20 to your computer and use it in GitHub Desktop.
Save allenyang79/377de1d22072518b43af669e611c2a20 to your computer and use it in GitHub Desktop.
job/worker pattern of python
# -*- coding: utf-8 -*-
import os
import time
import random
import logging
from multiprocessing import Process, Queue
#formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s|%(levelname)4s|%(processName)-10s| %(message)s')
logger = logging.getLogger()
def boss(queue):
"""Boss is responsible to assign job to each queue
"""
i = 0
while True:
for i in range(0, 5):
if queue.full():
logger.info('full')
else:
logger.info('assign task')
queue.put({
'fn': task,
'args': ('task-%s' % i, ),
'kwargs': {}
})
i += 1
time.sleep(10)
def worker(worker_id, queue):
"""Worker will focus on specific queue to handle.
"""
while True:
logger.info("handle task: %s", worker_id)
job = queue.get()
job['fn'](*job['args'], **job['kwargs'])
def task(task_id):
logger.info("task: %s start", task_id)
for i in range(0, random.randint(5, 15)):
time.sleep(1)
logger.info("task: %s process", task_id)
logger.info("task: %s done", task_id)
def memory_usage_psutil():
# return the memory usage in MB
import psutil
process = psutil.Process(os.getpid())
mem = process.memory_info()[0] / float(2 ** 20)
return mem
def _ulimit():
import resource
ulimit_list = [
('RLIMIT_CORE', 'core file size'),
('RLIMIT_CPU', 'CPU time'),
('RLIMIT_FSIZE', 'file size'),
('RLIMIT_DATA', 'heap size'),
('RLIMIT_STACK', 'stack size'),
('RLIMIT_RSS', 'resident set size'),
('RLIMIT_NPROC', 'number of processes'),
('RLIMIT_NOFILE', 'number of open files'),
('RLIMIT_MEMLOCK', 'lockable memory address'),
]
for name, desc in ulimit_list:
limit_num = getattr(resource, name)
soft, hard = resource.getrlimit(limit_num)
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (400000, 400000))
except Exception as e:
logger.error('unable to setrlimit: e: %s', e)
for name, desc in ulimit_list:
limit_num = getattr(resource, name)
soft, hard = resource.getrlimit(limit_num)
logger.warning('after setrlimit: Maximum %-25s (%-15s) : %20s %20s', desc, name, soft, hard)
def main():
queue = Queue(5)
# boss
Process(target=boss, args=(queue,)).start()
# worker
worker_id = 'worker-01'
Process(target=worker, args=(worker_id, queue,)).start()
worker_id = 'worker-02'
Process(target=worker, args=(worker_id, queue,)).start()
worker_id = 'worker-03'
Process(target=worker, args=(worker_id, queue,)).start()
watchdog()
def watchdog():
while True:
time.sleep(60)
#print memory_usage_psutil()
_ulimit()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment