Last active
September 21, 2018 16:52
-
-
Save wallabra/66d9de888fa3a5b2632fb6304ff68875 to your computer and use it in GitHub Desktop.
Python 3 Taskman
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import collections | |
class TaskList(object): | |
def __init__(self, tasker, name=None): | |
self.tasker = tasker | |
self.name = name | |
self.tasks = collections.deque() | |
def add(self, task): | |
self.tasks.append(task) | |
def next(self): | |
try: | |
task = self.tasks.popleft() | |
except IndexError: | |
raise StopIteration | |
return task(self.tasker) | |
def run(self): | |
while not self.tasks.empty(): | |
self.next() | |
class DefaultTask(object): | |
def __init__(self, queue, func, *args, **kwargs): | |
self.queue = queue | |
self.func = func | |
self.args = tuple(args) | |
self.kwargs = kwargs | |
def __call__(self, tasker): | |
return self.func(tasker, *self.args, **self.kwargs) | |
class WhileTask(object): | |
def __init__(self, queue, predicate, func, *args, **kwargs): | |
self.queue = queue | |
self.predicate = predicate | |
self.func = func | |
self.args = tuple(args) | |
self.kwargs = kwargs | |
def __call__(self, tasker): | |
r = None | |
if self.predicate(): | |
r = self.func(tasker, *self.args, **self.kwargs) | |
self.queue.tasks.appendleft(self) | |
else: | |
raise StopIteration | |
return r | |
class ForTask(object): | |
def __init__(self, queue, iterable, func, *args, **kwargs): | |
self.queue = queue | |
self.iterator = iter(iterable) | |
self.func = func | |
self.args = tuple(args) | |
self.kwargs = kwargs | |
def __call__(self, tasker): | |
r = None | |
item = self.iterator.next() # raises StopIteration, which is handled by the TaskQueueManager | |
r = self.func(tasker, item, *self.args, **self.kwargs) | |
self.queue.tasks.appendleft(self) | |
return r | |
class TaskQueueManager(object): | |
def __init__(self): | |
self.task_queues = {} | |
self._stop = False | |
self.intervals = {} | |
self.run_times = {} | |
self.intrinsic_delay = {} | |
self._current = None | |
def interval(self, amount): | |
self.intervals[self._current] += amount | |
def create(self, name): | |
self.task_queues[name] = TaskList(self, name) | |
self.intervals[name] = 0 | |
self.intrinsic_delay[name] = 0 | |
def get(self, name): | |
return self.task_queues[name] | |
def add(self, name, func, *args, **kwargs): | |
if name not in self.task_queues: | |
self.create(name) | |
self.task_queues[name].add(DefaultTask(self.task_queues[name], func, *args, **kwargs)) | |
def add_while(self, name, predicate, func, *args, **kwargs): | |
if name not in self.task_queues: | |
self.create(name) | |
self.task_queues[name].add(WhileTask(self.task_queues[name], predicate, func, *args, **kwargs)) | |
def add_for(self, name, iterable, func, *args, **kwargs): | |
if name not in self.task_queues: | |
self.create(name) | |
self.task_queues[name].add(ForTask(self.task_queues[name], iterable, func, *args, **kwargs)) | |
def schedule(self, name, delay): | |
self.intrinsic_delay[name] += delay | |
def stop(self): | |
self._stop = True | |
def run_for(self, duration): | |
"""This is where the real distribution begins. | |
Here, we are iterating through our task queues, | |
and executing their tasks one by one.""" | |
taskq = tuple(self.task_queues.values()) | |
self.run_times = { t.name: time.time() for t in taskq } | |
mt = self.max_times() | |
k = min(mt, key=mt.get) | |
i = [t.name for t in taskq].index(k) | |
res = { t.name: [] for t in taskq } | |
max_time = time.time() + duration | |
while not self._stop: | |
t = taskq[i] | |
if t.name in self.run_times and time.time() < self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name]: | |
time.sleep(self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] - time.time()) | |
if time.time() >= max_time: | |
break | |
if self._stop: | |
break | |
self._current = t.name | |
self.intervals[t.name] = 0.0 | |
self.run_times[t.name] = time.time() | |
try: | |
r = t.next() | |
except StopIteration: | |
self.task_queues.pop(t.name) | |
taskq = tuple(self.task_queues.values()) | |
else: | |
res[t.name].append(r) | |
mt = self.max_times() | |
k = min(mt, key=mt.get) | |
i = [t.name for t in taskq].index(k) | |
self._stop = False | |
return res | |
def max_times(self): | |
return { t.name: self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] for t in self.task_queues.values() } | |
def run(self): | |
taskq = tuple(self.task_queues.values()) | |
self.run_times = { t.name: time.time() for t in taskq } | |
mt = self.max_times() | |
k = min(mt, key=mt.get) | |
i = [t.name for t in taskq].index(k) | |
res = { t.name: [] for t in taskq } | |
while not self._stop: | |
t = taskq[i] | |
if t.name in self.run_times and time.time() < self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name]: | |
time.sleep(self.intervals[t.name] + self.run_times[t.name] + self.intrinsic_delay[t.name] - time.time()) | |
if self._stop: | |
break | |
self._current = t.name | |
self.intervals[t.name] = 0.0 | |
self.run_times[t.name] = time.time() | |
try: | |
r = t.next() | |
except StopIteration: | |
self.task_queues.pop(t.name) | |
taskq = tuple(self.task_queues.values()) | |
else: | |
res[t.name].append(r) | |
self.intrinsic_delay[t.name] = 0 | |
mt = self.max_times() | |
k = min(mt, key=mt.get) | |
i = [t.name for t in taskq].index(k) | |
self._stop = False | |
return res | |
def taskify(f): | |
def __inner__(tasker, *args, **kwargs): | |
return f(*args, **kwargs) | |
return __inner__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment