Last active
June 25, 2021 10:01
-
-
Save djosix/071a03bcda9dae8ae9ca7cf8726e8798 to your computer and use it in GitHub Desktop.
Python directory-based command queue executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import re | |
import glob | |
import time | |
import traceback | |
import functools | |
import subprocess as sp | |
from datetime import datetime | |
from threading import Thread, Event | |
def no_raise_wrapper(function): | |
@functools.wraps(function) | |
def _wrapper(*args, **kwargs): | |
try: | |
return function(*args, **kwargs) | |
except: | |
print(traceback.format_exc()) | |
return _wrapper | |
def sanitize_env_dict(env_dict): | |
env_dict = env_dict.copy() | |
for key in list(env_dict.keys()): | |
assert re.match(r'^[a-zA-Z0-9_]+$', key) is not None | |
env_dict[key] = str(env_dict[key]) | |
return env_dict | |
class Worker(Thread): | |
def __init__(self, name, env={}, attrs={}): | |
super().__init__(name=name, daemon=True) | |
self.env = sanitize_env_dict(env) | |
self.attrs = attrs | |
self.__running_flag = Event() | |
self.__current_task = None | |
def submit(self, task): | |
assert self.is_available() | |
assert isinstance(task, (dict, type(None))) | |
self.__current_task = task | |
self.__running_flag.set() | |
def is_available(self): | |
is_available = self.is_alive() | |
is_available &= not self.__running_flag.is_set() | |
return is_available | |
def retrieve(self): | |
if not self.__running_flag.is_set(): | |
task = self.__current_task | |
self.__current_task = None | |
return task | |
def run(self): | |
while True: | |
self.__running_flag.wait() | |
if self.__current_task is None: | |
self.__running_flag.clear() | |
break | |
if isinstance(self.__current_task, dict): | |
self.__execute_task() | |
else: | |
self.__current_task = { | |
'__invalid_task': self.__current_task, | |
'run': {'completed': False} | |
} | |
self.__running_flag.clear() | |
def __execute_task(self): | |
self.__current_task['run'] = {} | |
self.__current_task['run']['start_time'] = time.time() | |
try: | |
kwargs = self.__current_task['popen'].copy() | |
env = self.env.copy() | |
env.update(kwargs.get('env', {})) | |
kwargs['env'] = env | |
sp.check_call(**kwargs) | |
self.__current_task['run']['completed'] = True | |
except: | |
self.__current_task['run']['completed'] = False | |
self.__current_task['run']['traceback'] = traceback.format_exc() | |
self.__current_task['run']['end_time'] = time.time() | |
self.__current_task['run']['worker'] = self # XXX: not serializable | |
class QueueExecutor: | |
def __init__( | |
self, | |
queue_dir = 'queue', | |
polling_interval = 0.5, | |
): | |
self.queue_dir = queue_dir | |
self.polling_interval = polling_interval | |
self.workers = [] | |
self.callbacks = {} | |
self.subdirs = { | |
name: os.path.join(self.queue_dir, name) | |
for name in ['pending', 'running', 'done', 'failed'] | |
} | |
for dir_path in self.subdirs.values(): | |
os.makedirs(dir_path, exist_ok=True) | |
def run(self): | |
assert len(self.workers) > 0 | |
for worker in self.workers: | |
worker.start() | |
@no_raise_wrapper | |
def handle_retrieved_task(task): | |
next_stage = ['failed', 'done'][task['run']['completed']] | |
self.__move_to_subdir(task, next_stage) | |
self.__callback(next_stage, task) | |
while True: | |
for worker in self.workers: | |
if not worker.is_available(): | |
continue | |
task = worker.retrieve() | |
if task is not None: | |
handle_retrieved_task(task) | |
new_task = self.__get_pending() | |
if new_task is not None: | |
worker.submit(new_task) | |
self.__move_to_subdir(new_task, 'running') | |
self.__callback('running', new_task) | |
else: | |
break | |
time.sleep(self.polling_interval) | |
def add_worker(self, name, env={}, attrs={}): | |
self.workers.append(Worker(name, env, attrs)) | |
def add_callback(self, stage, callback): | |
assert stage in ('running', 'done', 'failed') | |
stage_callbacks = self.callbacks.setdefault(stage, []) | |
stage_callbacks.append(no_raise_wrapper(callback)) | |
def submit_task( | |
self, | |
name, | |
args, | |
executable = None, | |
stdin = None, | |
stdout = None, | |
stderr = None, | |
cwd = None, | |
env = None, | |
attrs = {}, | |
): | |
popen_kwargs = {} | |
assert isinstance(args, (str, tuple, list)) | |
if isinstance(args, str): | |
shell = True | |
elif isinstance(args, (tuple, list)): | |
args = list(map(str, args)) | |
shell = False | |
popen_kwargs['args'] = args | |
popen_kwargs['shell'] = shell | |
if executable is not None: | |
assert isinstance(executable, str) | |
popen_kwargs['executable'] = executable | |
if stdin is not None: | |
assert isinstance(stdin, str) or stdin == sp.DEVNULL | |
popen_kwargs['stdin'] = stdin | |
if stdout is not None: | |
assert isinstance(stdout, str) | |
popen_kwargs['stdout'] = stdout | |
if stderr is not None: | |
assert isinstance(stderr, str) or stderr == sp.STDOUT | |
popen_kwargs['stderr'] = stderr | |
if cwd is not None: | |
assert isinstance(cwd, str) | |
popen_kwargs['cwd'] = cwd | |
if env is not None: | |
assert isinstance(env, dict) | |
popen_kwargs['env'] = sanitize_env_dict(env) | |
create_time = time.time() | |
datetime_str = datetime.fromtimestamp(create_time).strftime('%Y%m%d_%H%M%S') | |
file_name = '{}-{}.task.json'.format(datetime_str, name) | |
json_path = os.path.join(self.subdirs['pending'], file_name) | |
attrs = attrs.copy() | |
attrs.setdefault('name', name) | |
attrs.setdefault('file_name', file_name) | |
attrs.setdefault('create_time', create_time) | |
task_dict = { | |
'popen': popen_kwargs, | |
'attrs': attrs | |
} | |
with open(json_path, 'w') as f: | |
json.dump(task_dict, f, indent=2) | |
def __get_pending(self): | |
pattern = os.path.join(self.subdirs['pending'], '*.task.json') | |
pending_files = glob.glob(pattern) | |
if len(pending_files) > 0: | |
pending_path = min(pending_files, key=os.path.getmtime) | |
with open(pending_path) as f: | |
try: | |
task = json.load(f) | |
task['__path'] = pending_path | |
return task | |
except: | |
self.__move_to_subdir({'__path': pending_path}, 'failed') | |
@no_raise_wrapper | |
def __move_to_subdir(self, task, stage): | |
assert stage in ('running', 'done', 'failed') | |
source_path = task.pop('__path') | |
source_name = os.path.basename(source_path) | |
target_path = os.path.join(self.subdirs[stage], source_name) | |
os.rename(source_path, target_path) | |
task['__path'] = target_path | |
def __callback(self, stage, task): | |
for callback in self.callbacks[stage]: | |
callback(task) | |
if __name__ == '__main__': | |
import random | |
qe = QueueExecutor() | |
for i in range(30): | |
duration = random.randint(1, 3) | |
qe.submit_task(f'task_{i}', f''' | |
echo Training with CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES | |
sleep {duration} | |
''') | |
for gpu_id in [0, 1, 2, 3]: | |
qe.add_worker(f'GPU-{gpu_id}', {'CUDA_VISIBLE_DEVICES': gpu_id}) | |
print('workers:', qe.workers) | |
def make_callback(stage): | |
def _callback(task): | |
task_name = task['attrs']['name'] | |
if 'run' in task and 'traceback' in task['run']: | |
print(task['run']['traceback']) | |
print(f'[callback at stage={stage!r} task={task_name!r}]') | |
return _callback | |
qe.add_callback('running', make_callback('running')) | |
qe.add_callback('done', make_callback('done')) | |
qe.add_callback('failed', make_callback('failed')) | |
print('callbacks:', qe.callbacks) | |
print('run') | |
qe.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment