Last active
December 18, 2017 21:33
-
-
Save paulwinex/e0c85fd5a7c139d9b0e1f5a1011cecb9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PySide.QtCore import * | |
import sys, os, tempfile, json, random, hashlib, shlex, shutil | |
import logging as _logging | |
logger = _logging.getLogger(__name__) | |
logger.setLevel(_logging.DEBUG) | |
class ProcessControl(QObject): | |
""" | |
Thos class control process queue | |
""" | |
processCreatedSignal = Signal(str, object) # process_id, json_data | |
processOutputSignal = Signal(str, str) # process_id, message | |
processStartedSignal = Signal(str) # process_id | |
processResultSignal = Signal(str, object) # process_id, json_data | |
progressSignal = Signal(str, object) # process_id, progress_data | |
processErrorSignal = Signal(str, str) # process_id, message | |
processFinishedSignal = Signal(str, object) # process_id, code | |
queueEmptySignal = Signal() | |
startNextSignal = Signal() | |
_process_data_folder_name = '.process_data_files' | |
_folder_queue = '.queue' | |
_folder_in_progress = '.in_progress' | |
_folder_error = '.error' | |
_folder_completed = '.completed' | |
_binding_file_name = '.process_control_bind_file' | |
def __init__(self, max_process=None): | |
super(ProcessControl, self).__init__() | |
self.max_process = max_process or 2 | |
self._process_workers = {} # QProcess instances | |
self.stopped = False | |
self.__bind_file = None | |
tmp_dir = tempfile.gettempdir() | |
if not os.path.exists(tmp_dir): | |
err = 'Temp folder not exists: %s' % tmp_dir | |
logger.critical(err) | |
raise Exception(err) | |
self.out_dir = os.path.join(tmp_dir, self._process_data_folder_name) | |
if not os.path.exists(self.out_dir): | |
try: | |
os.makedirs(self.out_dir) | |
except Exception as e: | |
err = 'Cant create temp folder: %s' % str(e) | |
logger.critical(err) | |
raise Exception(err) | |
if not self._allow_to_start(): | |
raise Exception('Error locked file. Process control already started?') | |
self._bind_file() | |
self.move_incompleted_to_queue() | |
logger.debug('PROCESS STARTED') | |
self.startNextSignal.connect(self.start_next) | |
self.startNextSignal.emit() | |
def __del__(self): | |
if self.__bind_file: | |
self.__bind_file.close() | |
@property | |
def _binding_file(self): | |
return os.path.join(self.out_dir, self._binding_file_name) | |
def _allow_to_start(self): | |
""" | |
Binding file is free? | |
:return: bool | |
""" | |
if os.path.exists(self._binding_file): | |
try: | |
os.remove(self._binding_file) | |
return True | |
except: | |
return False | |
return True | |
def _bind_file(self): | |
""" | |
Create and lock binding file to disable start multiple instances of process_control in same root | |
""" | |
if os.path.exists(self._binding_file): | |
try: | |
os.remove(self._binding_file) | |
except Exception as e: | |
raise Exception('Error start process, binding file is locked in {}'.format(self.out_dir)) | |
self.__bind_file = open(self._binding_file, 'w') | |
def move_incompleted_to_queue(self): | |
""" | |
Move from "in_progress" to "queue" on startup if process_control was crash | |
""" | |
in_progress = os.path.join(self.out_dir, self._folder_in_progress) | |
queue = os.path.join(self.out_dir, self._folder_queue) | |
for f in os.listdir(in_progress): | |
try: | |
shutil.move(os.path.join(in_progress, f), queue) | |
except Exception as e: | |
logger.error(str(e)) | |
def set_max_process_count(self, val): | |
""" | |
Change maximum of process count in same time | |
:param val: int | |
""" | |
clean = min(max(1, val), 25) | |
logger.debug('Set max process count to %s' % clean) | |
self.max_process = clean | |
def open_temp_folder(self): | |
""" | |
Open tem folde in UI mode | |
""" | |
self.__open_folder(self.out_dir) | |
def add_task(self, cmd, args=None, name=None, env=None, workdir=None): | |
""" | |
Add new process task | |
:param cmd: executable | |
:param args: list of arguments | |
:param name: name of task | |
:param env: dict? environment variables | |
:param workdir: work dir for process | |
:return: str uuid of task | |
""" | |
data = dict( | |
cmd=cmd, | |
args=args or [], | |
name=name, | |
env=env, | |
workdir=workdir | |
) | |
uuid = self.__generate_uid(data) | |
data['__uuid'] = uuid | |
out_file = os.path.join(self.out_dir, self._folder_queue, uuid) | |
if not os.path.exists(os.path.dirname(out_file)): | |
os.makedirs(os.path.dirname(out_file)) | |
data['__data_file'] = out_file | |
try: | |
# save meta file to queue | |
json.dump(data, open(out_file, 'w'), indent=2) | |
except: | |
logger.error('Cant save data to json') | |
return | |
logger.debug('Add task %s' % name) | |
self.processCreatedSignal.emit(uuid, data) | |
self.startNextSignal.emit() | |
return uuid | |
def start_next(self): | |
""" | |
Start next task | |
""" | |
if self.stopped: | |
logger.debug('Process Control Is Stopped') | |
return | |
if len(self._process_workers) >= self.max_process: | |
logger.debug('Process count limit') | |
return | |
next_file = self.get_next_data() | |
if not next_file: | |
logger.debug('Pool empty') | |
self.queueEmptySignal.emit() | |
return | |
uuid = os.path.basename(next_file) | |
if uuid in self._process_workers: | |
logger.error('Meta file already in progress: %s' % uuid) | |
self.startNextSignal.emit() | |
return | |
try: | |
task_data = json.load(open(next_file)) | |
except: | |
self.move_to_error(next_file, 'Can`t read meta file') | |
self.errorSignal.emit('Can`t read meta file: %s' % next_file) | |
return | |
next_file = self.move_to_in_progress(next_file) | |
process = ProcessWorker(task_data) | |
process.outputSignal.connect(lambda msg, uid=uuid: self.process_output(uid, msg)) | |
process.errorSignal.connect(lambda uid=uuid, proc=process, f=next_file: self.process_error(uid, proc, f)) | |
process.closedSignal.connect(lambda code, uid=uuid, f=next_file: self.process_closed(uid, code, f)) | |
self._process_workers[uuid] = process | |
# work dir | |
wd = task_data.get('workdir') | |
if wd: | |
cur = os.getcwd() | |
QDir.setCurrent(wd) | |
process.start() | |
QDir.setCurrent(cur) | |
else: | |
process.start() | |
self.processStartedSignal.emit(uuid) | |
logger.debug('Process started %s' % process) | |
def process_output(self, uuid, message): | |
""" | |
Send ptocess output with uuid | |
:param uuid: uuid of task | |
:param message: text message | |
""" | |
self.processOutputSignal.emit(uuid, message) | |
def process_error(self, uuid, message, data_file): | |
""" | |
Send error of process with uuid | |
:param uuid: uuid of task | |
:param message: error text | |
:param data_file: task data file | |
""" | |
# signal | |
self.processErrorSignal.emit(uuid, message) | |
# move file | |
self.move_to_error(data_file, str(message)) | |
# kill process | |
proc = self._process_workers.get(uuid) | |
if proc: | |
proc.stop() | |
def process_closed(self, uuid, code, data_file): | |
""" | |
Proces finished and closed | |
:param uuid: uuid of task | |
:param code: exit code | |
:param data_file: task data file | |
""" | |
logger.debug('Process closed with code %s' % code) | |
proc = self._process_workers.get(uuid) | |
if not proc: | |
logger.error('Completed Process %s not exists in array' % uuid) | |
return | |
else: | |
proc.deleteLater() | |
del self._process_workers[uuid] | |
if code == 0: | |
# exit normal | |
if data_file: | |
self.move_to_complete(data_file) | |
else: | |
# exit error | |
self.process_error(uuid, 'Process exit with error code {}. Check outputs to details.'.format(code), | |
data_file) | |
# process finished signal | |
self.processFinishedSignal.emit(uuid, code) | |
# start next | |
self.startNextSignal.emit() | |
def __generate_uid(self, data): | |
""" | |
Create unique id from data + random | |
:param data: object | |
:return: uuid | |
""" | |
return hashlib.md5(''.join([str(data), str(random.random())])).hexdigest() | |
def get_next_data(self): | |
""" | |
Return next data file from queue sorted by creation date | |
:return: str path | |
""" | |
queue_dir = os.path.join(self.out_dir, self._folder_queue) | |
if not os.path.exists(queue_dir): | |
logger.error('Queue dir not exists') | |
return | |
first = (sorted([os.path.normpath(os.path.join(queue_dir, x)) for x in os.listdir(queue_dir) if | |
os.path.isfile(os.path.join(queue_dir, x)) and x not in self._process_workers], | |
key=lambda f: os.stat(os.path.join(queue_dir, f)).st_ctime) or [None])[0] | |
return first | |
def move_to_error(self, path, msg=None): | |
""" | |
Move data file to error dir | |
:param path: data file path | |
:param msg: error message | |
:return: | |
""" | |
# add error text to file | |
try: | |
data = json.load(open(path)) | |
data['error'] = msg | |
json.dump(data, open(path, 'w'), indent=2) | |
except Exception as e: | |
open(path, 'a').write('\n\n{}'.format(str(e))) | |
return self._move_to(path, self._folder_error) | |
def move_to_complete(self, path): | |
return self._move_to(path, self._folder_completed) | |
def move_to_in_progress(self, path): | |
return self._move_to(path, self._folder_in_progress) | |
def _move_to(self, path, folder): | |
if not os.path.exists(path): | |
return | |
new_path = os.path.normpath(os.path.join(self.out_dir, folder, os.path.basename(path))) | |
if not os.path.exists(os.path.dirname(new_path)): | |
os.mkdir(os.path.dirname(new_path)) | |
shutil.move(path, new_path) | |
return new_path | |
@classmethod | |
def __open_folder(cls, path): | |
""" | |
Open folder in UI mode | |
:param path: folder path | |
:return: | |
""" | |
if os.name == 'nt': | |
os.startfile(path) | |
elif os.name == 'posix': | |
# todo: change for linux | |
os.system('open "%s"' % path) | |
elif os.name == 'os2': | |
os.system('open "%s"' % path) | |
def pause(self): | |
""" | |
Stop queue but not current processes | |
""" | |
self.stopped = True | |
def resume(self): | |
""" | |
Resume queue | |
""" | |
self.stopped = False | |
self.startNextSignal.emit() | |
class ProcessWorker(QObject): | |
""" | |
This class works with QProcess | |
""" | |
outputSignal = Signal(str) # output message | |
errorSignal = Signal(str) # error message | |
closedSignal = Signal(object) # exit code | |
def __init__(self, data): | |
super(ProcessWorker, self).__init__() | |
self.data = data | |
self.proc = None | |
if not self.data.get('cmd'): | |
self.errorSignal.emit('Commend is empty') | |
return | |
# [p for p in re.split("( |\\\".*?\\\"|'.*?')", c) if p.strip()] | |
self.name = self.data.get('name', os.path.basename(shlex.split(self.data.get('cmd', ''))[0])) | |
def __repr__(self): | |
return ('<ProcessWorker #%s "%s">' % (self.data.get('__uuid'), self.name or 'noname')).strip() | |
def start(self): | |
""" | |
Main start method | |
""" | |
self.proc = QProcess() | |
self.proc.setProcessChannelMode(QProcess.MergedChannels) | |
self.proc.readyRead.connect(self.send_output) | |
self.proc.finished.connect(self.send_finished) | |
self.proc.error.connect(self.errorSignal.emit) | |
env = self.data.get('env') | |
if isinstance(env, dict) and env: | |
qenv = QProcessEnvironment.systemEnvironment() | |
for k, v in env.items(): | |
qenv.insert(k, v) | |
self.proc.setProcessEnvironment(qenv) | |
args = self.data.get('args', []) | |
if not isinstance(args, list): | |
logger.error('Args should be list type') | |
args = [] | |
logger.debug('COMMAND: %s' % ' '.join([self.data.get('cmd')]+[str(x) for x in args])) | |
self.proc.start(self.data.get('cmd'), [str(x) for x in args]) | |
def send_output(self): | |
""" | |
Catch process output | |
""" | |
output = self.proc.readAll() | |
if not isinstance(output, str): | |
if sys.version_info[0] < 3: | |
output = str(output) | |
else: | |
output = str(output, 'utf-8') | |
output = str(output).strip() | |
if output: | |
# send | |
self.outputSignal.emit(output) | |
def stop(self): | |
""" | |
Stop process | |
""" | |
if self.proc.isOpen(): | |
self.proc.kill() | |
self.proc.terminate() | |
def send_finished(self, code): | |
""" | |
Send signal on process stopped | |
:param code: exit code | |
""" | |
self.closedSignal.emit(code) | |
if __name__ == '__main__': | |
# executable text script | |
import sys, json, time | |
def msg(text): | |
sys.stdout.write('\n'+str(text)) | |
sys.stdout.flush() | |
msg('Start process') | |
for i in range(0, 100, 20): | |
time.sleep(0.5) | |
msg('Progress: {}%'.format(i)) | |
if i > 50: | |
if 'error' in sys.argv: | |
sys.exit(3) | |
result = dict( | |
value1=1, | |
value2=2 | |
) | |
msg('Result:::{}:::'.format(json.dumps(result))) | |
msg('Complete') | |
######################### TESTS ############################ | |
if False: | |
import process_control | |
import random | |
from PySide.QtCore import QTimer | |
PROCESS = ProcessControl() | |
def callback(v, *args): | |
name = ([k for k in PROCESS.__dict__.keys() if PROCESS.__dict__[k] == v] or [v])[0] | |
print 'CALLBACK "%s" >> %s' % (name, str(args)) | |
for sig in [ | |
PROCESS.processCreatedSignal, | |
PROCESS.processOutputSignal, | |
PROCESS.processStartedSignal, | |
PROCESS.processResultSignal, | |
PROCESS.progressSignal, | |
PROCESS.processErrorSignal, | |
PROCESS.processFinishedSignal, | |
PROCESS.queueEmptySignal | |
]: | |
sig.connect(lambda signal=sig, *args: callback(sig, *args)) | |
cmd = sys.executable | |
args1 = [os.path.normpath(process_control.__file__)] | |
args2 = [os.path.normpath(process_control.__file__), 'error'] | |
env = {'PYTHONPATH': os.getcwd()} | |
PROCESS.set_max_process_count(2) | |
for i in range(10): | |
PROCESS.add_task(cmd, | |
args=random.choice([args1, args2]), | |
name='Test {}'.format(i), | |
env=env, | |
workdir=os.path.dirname(os.path.dirname(process_control.__file__)) | |
) | |
time.sleep(0.3) | |
# test with crash | |
# QTimer.singleShot(7000, sys.exit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment