Skip to content

Instantly share code, notes, and snippets.

@wallabra
Last active September 21, 2018 16:52
Show Gist options
  • Save wallabra/66d9de888fa3a5b2632fb6304ff68875 to your computer and use it in GitHub Desktop.
Save wallabra/66d9de888fa3a5b2632fb6304ff68875 to your computer and use it in GitHub Desktop.
Python 3 Taskman
import time
import collections
class TaskList(object):
def __init__(self, tasker, name=None):
self.tasker = tasker
self.name = name
self.tasks = collections.deque()
def add(self, task):
self.tasks.append(task)
def next(self):
try:
task = self.tasks.popleft()
except IndexError:
raise StopIteration
return task(self.tasker)
def run(self):
while not self.tasks.empty():
self.next()
class DefaultTask(object):
def __init__(self, queue, func, *args, **kwargs):
self.queue = queue
self.func = func
self.args = tuple(args)
self.kwargs = kwargs
def __call__(self, tasker):
return self.func(tasker, *self.args, **self.kwargs)
class WhileTask(object):
def __init__(self, queue, predicate, func, *args, **kwargs):
self.queue = queue
self.predicate = predicate
self.func = func
self.args = tuple(args)
self.kwargs = kwargs
def __call__(self, tasker):
r = None
if self.predicate():
r = self.func(tasker, *self.args, **self.kwargs)
self.queue.tasks.appendleft(self)
else:
raise StopIteration
return r
class ForTask(object):
def __init__(self, queue, iterable, func, *args, **kwargs):
self.queue = queue
self.iterator = iter(iterable)
self.func = func
self.args = tuple(args)
self.kwargs = kwargs
def __call__(self, tasker):
r = None
item = self.iterator.next() # raises StopIteration, which is handled by the TaskQueueManager
r = self.func(tasker, item, *self.args, **self.kwargs)
self.queue.tasks.appendleft(self)
return r
class TaskQueueManager(object):
def __init__(self):
self.task_queues = {}
self._stop = False
self.intervals = {}
self.run_times = {}
self.intrinsic_delay = {}
self._current = None
def interval(self, amount):
self.intervals[self._current] += amount
def create(self, name):
self.task_queues[name] = TaskList(self, name)
self.intervals[name] = 0
self.intrinsic_delay[name] = 0
def get(self, name):
return self.task_queues[name]
def add(self, name, func, *args, **kwargs):
if name not in self.task_queues:
self.create(name)
self.task_queues[name].add(DefaultTask(self.task_queues[name], func, *args, **kwargs))
def add_while(self, name, predicate, func, *args, **kwargs):
if name not in self.task_queues:
self.create(name)
self.task_queues[name].add(WhileTask(self.task_queues[name], predicate, func, *args, **kwargs))
def add_for(self, name, iterable, func, *args, **kwargs):
if name not in self.task_queues:
self.create(name)
self.task_queues[name].add(ForTask(self.task_queues[name], iterable, func, *args, **kwargs))
def schedule(self, name, delay):
self.intrinsic_delay[name] += delay
def stop(self):
self._stop = True
def run_for(self, duration):
"""This is where the real distribution begins.
Here, we are iterating through our task queues,
and executing their tasks one by one."""
taskq = tuple(self.task_queues.values())
self.run_times = { t.name: time.time() for t in taskq }
mt = self.max_times()
k = min(mt, key=mt.get)
i = [t.name for t in taskq].index(k)
res = { t.name: [] for t in taskq }
max_time = time.time() + duration
while not self._stop:
t = taskq[i]
if t.name in self.run_times and time.time() < self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name]:
time.sleep(self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] - time.time())
if time.time() >= max_time:
break
if self._stop:
break
self._current = t.name
self.intervals[t.name] = 0.0
self.run_times[t.name] = time.time()
try:
r = t.next()
except StopIteration:
self.task_queues.pop(t.name)
taskq = tuple(self.task_queues.values())
else:
res[t.name].append(r)
mt = self.max_times()
k = min(mt, key=mt.get)
i = [t.name for t in taskq].index(k)
self._stop = False
return res
def max_times(self):
return { t.name: self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] for t in self.task_queues.values() }
def run(self):
taskq = tuple(self.task_queues.values())
self.run_times = { t.name: time.time() for t in taskq }
mt = self.max_times()
k = min(mt, key=mt.get)
i = [t.name for t in taskq].index(k)
res = { t.name: [] for t in taskq }
while not self._stop:
t = taskq[i]
if t.name in self.run_times and time.time() < self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name]:
time.sleep(self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] - time.time())
if self._stop:
break
self._current = t.name
self.intervals[t.name] = 0.0
self.run_times[t.name] = time.time()
try:
r = t.next()
except StopIteration:
self.task_queues.pop(t.name)
taskq = tuple(self.task_queues.values())
else:
res[t.name].append(r)
self.intrinsic_delay[t.name] = 0
mt = self.max_times()
k = min(mt, key=mt.get)
i = [t.name for t in taskq].index(k)
self._stop = False
return res
def taskify(f):
def __inner__(tasker, *args, **kwargs):
return f(*args, **kwargs)
return __inner__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment