Skip to content

Instantly share code, notes, and snippets.

@siteshen
Created February 25, 2014 10:37
Show Gist options
  • Select an option

  • Save siteshen/9206619 to your computer and use it in GitHub Desktop.

Select an option

Save siteshen/9206619 to your computer and use it in GitHub Desktop.
A simple threading-based task scheduler.
from Queue import Queue, PriorityQueue
from threading import Thread
class Scheduler(Thread):
def __init__(self, *args, **kwargs):
super(Scheduler, self).__init__(*args, **kwargs)
self.tasks = Queue()
self.workers = PriorityQueue()
def add_task(self, task):
self.tasks.put(task)
def add_worker(self, worker):
self.workers.put(worker)
def run(self):
while True:
task, worker = self.tasks.get(), self.workers.get()
worker.handle_task(task)
class Worker(Thread):
def __init__(self, scheduler=None, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.tasks = Queue()
self.scheduler = scheduler
def __lt__(self, worker):
# workers is a priority queue
return self.tasks.qsize() < worker.tasks.qsize()
def handle_task(self, task):
self.tasks.put(task)
def run(self):
while True:
# work, then put worker back to worker pool
task = self.tasks.get()
self.work(task)
self.scheduler.add_worker(self)
def work(self, task):
"""Do real stuff here."""
raise NotImplementedError()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment