Skip to content

Instantly share code, notes, and snippets.

@allenyang79
Created March 7, 2019 05:59
Show Gist options
  • Save allenyang79/76fb28c1bf6080b3107822bbe4cd1faa to your computer and use it in GitHub Desktop.
Save allenyang79/76fb28c1bf6080b3107822bbe4cd1faa to your computer and use it in GitHub Desktop.
python threading job-queue
import multiprocessing as mp
import time
import random
import traceback
import threading
import Queue
import os
import logging
log_formatter_str = '%(asctime)s [%(levelname)-5.5s][%(processName)s][%(threadName)s] %(module)s#%(funcName)s@%(lineno)d: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_formatter_str, datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger()
class Task(object):
def __init__(self, fn, args, kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
self.state = 'pending'
self.error = None
self.result = None
class TaskQueue(Queue.Queue):
def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.workers = []
def start(self):
logger.info("TaskQueue start")
self.start_workers()
def add_task(self, fn, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
task = Task(fn, args, kwargs)
self.put(task)
return task
def start_workers(self):
for i in range(self.num_workers):
t = threading.Thread(target=self.worker)
t.daemon = True
self.workers.append(t)
t.start()
def worker(self):
"""consume"""
while True:
task = self.get()
fn, args, kwargs = task.fn, task.args, task.kwargs
task.state = 'running'
try:
result = fn(*args, **kwargs)
task.result = result
task.state = 'done'
except:
task.state = 'error'
task.error = traceback.format_exc()
self.task_done()
def task_func(a):
if a == 5:
raise Exception("oops")
for i in range(0, random.randint(0, 30)):
logger.info("working, %s", i)
time.sleep(0.1)
return a * a
q = TaskQueue(num_workers=10)
tasks = []
for i in range(0, 10):
task = q.add_task(task_func, i)
tasks.append(task)
q.start()
q.join() # block until all tasks are done
for task in tasks:
print(task, task.state, task.result, task.error)
logger.info("All done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment