Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Last active December 18, 2017 21:33
Show Gist options
  • Save paulwinex/e0c85fd5a7c139d9b0e1f5a1011cecb9 to your computer and use it in GitHub Desktop.
Save paulwinex/e0c85fd5a7c139d9b0e1f5a1011cecb9 to your computer and use it in GitHub Desktop.
from PySide.QtCore import *
import sys, os, tempfile, json, random, hashlib, shlex, shutil
import logging as _logging
logger = _logging.getLogger(__name__)
logger.setLevel(_logging.DEBUG)
class ProcessControl(QObject):
"""
Thos class control process queue
"""
processCreatedSignal = Signal(str, object) # process_id, json_data
processOutputSignal = Signal(str, str) # process_id, message
processStartedSignal = Signal(str) # process_id
processResultSignal = Signal(str, object) # process_id, json_data
progressSignal = Signal(str, object) # process_id, progress_data
processErrorSignal = Signal(str, str) # process_id, message
processFinishedSignal = Signal(str, object) # process_id, code
queueEmptySignal = Signal()
startNextSignal = Signal()
_process_data_folder_name = '.process_data_files'
_folder_queue = '.queue'
_folder_in_progress = '.in_progress'
_folder_error = '.error'
_folder_completed = '.completed'
_binding_file_name = '.process_control_bind_file'
def __init__(self, max_process=None):
super(ProcessControl, self).__init__()
self.max_process = max_process or 2
self._process_workers = {} # QProcess instances
self.stopped = False
self.__bind_file = None
tmp_dir = tempfile.gettempdir()
if not os.path.exists(tmp_dir):
err = 'Temp folder not exists: %s' % tmp_dir
logger.critical(err)
raise Exception(err)
self.out_dir = os.path.join(tmp_dir, self._process_data_folder_name)
if not os.path.exists(self.out_dir):
try:
os.makedirs(self.out_dir)
except Exception as e:
err = 'Cant create temp folder: %s' % str(e)
logger.critical(err)
raise Exception(err)
if not self._allow_to_start():
raise Exception('Error locked file. Process control already started?')
self._bind_file()
self.move_incompleted_to_queue()
logger.debug('PROCESS STARTED')
self.startNextSignal.connect(self.start_next)
self.startNextSignal.emit()
def __del__(self):
if self.__bind_file:
self.__bind_file.close()
@property
def _binding_file(self):
return os.path.join(self.out_dir, self._binding_file_name)
def _allow_to_start(self):
"""
Binding file is free?
:return: bool
"""
if os.path.exists(self._binding_file):
try:
os.remove(self._binding_file)
return True
except:
return False
return True
def _bind_file(self):
"""
Create and lock binding file to disable start multiple instances of process_control in same root
"""
if os.path.exists(self._binding_file):
try:
os.remove(self._binding_file)
except Exception as e:
raise Exception('Error start process, binding file is locked in {}'.format(self.out_dir))
self.__bind_file = open(self._binding_file, 'w')
def move_incompleted_to_queue(self):
"""
Move from "in_progress" to "queue" on startup if process_control was crash
"""
in_progress = os.path.join(self.out_dir, self._folder_in_progress)
queue = os.path.join(self.out_dir, self._folder_queue)
for f in os.listdir(in_progress):
try:
shutil.move(os.path.join(in_progress, f), queue)
except Exception as e:
logger.error(str(e))
def set_max_process_count(self, val):
"""
Change maximum of process count in same time
:param val: int
"""
clean = min(max(1, val), 25)
logger.debug('Set max process count to %s' % clean)
self.max_process = clean
def open_temp_folder(self):
"""
Open tem folde in UI mode
"""
self.__open_folder(self.out_dir)
def add_task(self, cmd, args=None, name=None, env=None, workdir=None):
"""
Add new process task
:param cmd: executable
:param args: list of arguments
:param name: name of task
:param env: dict? environment variables
:param workdir: work dir for process
:return: str uuid of task
"""
data = dict(
cmd=cmd,
args=args or [],
name=name,
env=env,
workdir=workdir
)
uuid = self.__generate_uid(data)
data['__uuid'] = uuid
out_file = os.path.join(self.out_dir, self._folder_queue, uuid)
if not os.path.exists(os.path.dirname(out_file)):
os.makedirs(os.path.dirname(out_file))
data['__data_file'] = out_file
try:
# save meta file to queue
json.dump(data, open(out_file, 'w'), indent=2)
except:
logger.error('Cant save data to json')
return
logger.debug('Add task %s' % name)
self.processCreatedSignal.emit(uuid, data)
self.startNextSignal.emit()
return uuid
def start_next(self):
"""
Start next task
"""
if self.stopped:
logger.debug('Process Control Is Stopped')
return
if len(self._process_workers) >= self.max_process:
logger.debug('Process count limit')
return
next_file = self.get_next_data()
if not next_file:
logger.debug('Pool empty')
self.queueEmptySignal.emit()
return
uuid = os.path.basename(next_file)
if uuid in self._process_workers:
logger.error('Meta file already in progress: %s' % uuid)
self.startNextSignal.emit()
return
try:
task_data = json.load(open(next_file))
except:
self.move_to_error(next_file, 'Can`t read meta file')
self.errorSignal.emit('Can`t read meta file: %s' % next_file)
return
next_file = self.move_to_in_progress(next_file)
process = ProcessWorker(task_data)
process.outputSignal.connect(lambda msg, uid=uuid: self.process_output(uid, msg))
process.errorSignal.connect(lambda uid=uuid, proc=process, f=next_file: self.process_error(uid, proc, f))
process.closedSignal.connect(lambda code, uid=uuid, f=next_file: self.process_closed(uid, code, f))
self._process_workers[uuid] = process
# work dir
wd = task_data.get('workdir')
if wd:
cur = os.getcwd()
QDir.setCurrent(wd)
process.start()
QDir.setCurrent(cur)
else:
process.start()
self.processStartedSignal.emit(uuid)
logger.debug('Process started %s' % process)
def process_output(self, uuid, message):
"""
Send ptocess output with uuid
:param uuid: uuid of task
:param message: text message
"""
self.processOutputSignal.emit(uuid, message)
def process_error(self, uuid, message, data_file):
"""
Send error of process with uuid
:param uuid: uuid of task
:param message: error text
:param data_file: task data file
"""
# signal
self.processErrorSignal.emit(uuid, message)
# move file
self.move_to_error(data_file, str(message))
# kill process
proc = self._process_workers.get(uuid)
if proc:
proc.stop()
def process_closed(self, uuid, code, data_file):
"""
Proces finished and closed
:param uuid: uuid of task
:param code: exit code
:param data_file: task data file
"""
logger.debug('Process closed with code %s' % code)
proc = self._process_workers.get(uuid)
if not proc:
logger.error('Completed Process %s not exists in array' % uuid)
return
else:
proc.deleteLater()
del self._process_workers[uuid]
if code == 0:
# exit normal
if data_file:
self.move_to_complete(data_file)
else:
# exit error
self.process_error(uuid, 'Process exit with error code {}. Check outputs to details.'.format(code),
data_file)
# process finished signal
self.processFinishedSignal.emit(uuid, code)
# start next
self.startNextSignal.emit()
def __generate_uid(self, data):
"""
Create unique id from data + random
:param data: object
:return: uuid
"""
return hashlib.md5(''.join([str(data), str(random.random())])).hexdigest()
def get_next_data(self):
"""
Return next data file from queue sorted by creation date
:return: str path
"""
queue_dir = os.path.join(self.out_dir, self._folder_queue)
if not os.path.exists(queue_dir):
logger.error('Queue dir not exists')
return
first = (sorted([os.path.normpath(os.path.join(queue_dir, x)) for x in os.listdir(queue_dir) if
os.path.isfile(os.path.join(queue_dir, x)) and x not in self._process_workers],
key=lambda f: os.stat(os.path.join(queue_dir, f)).st_ctime) or [None])[0]
return first
def move_to_error(self, path, msg=None):
"""
Move data file to error dir
:param path: data file path
:param msg: error message
:return:
"""
# add error text to file
try:
data = json.load(open(path))
data['error'] = msg
json.dump(data, open(path, 'w'), indent=2)
except Exception as e:
open(path, 'a').write('\n\n{}'.format(str(e)))
return self._move_to(path, self._folder_error)
def move_to_complete(self, path):
return self._move_to(path, self._folder_completed)
def move_to_in_progress(self, path):
return self._move_to(path, self._folder_in_progress)
def _move_to(self, path, folder):
if not os.path.exists(path):
return
new_path = os.path.normpath(os.path.join(self.out_dir, folder, os.path.basename(path)))
if not os.path.exists(os.path.dirname(new_path)):
os.mkdir(os.path.dirname(new_path))
shutil.move(path, new_path)
return new_path
@classmethod
def __open_folder(cls, path):
"""
Open folder in UI mode
:param path: folder path
:return:
"""
if os.name == 'nt':
os.startfile(path)
elif os.name == 'posix':
# todo: change for linux
os.system('open "%s"' % path)
elif os.name == 'os2':
os.system('open "%s"' % path)
def pause(self):
"""
Stop queue but not current processes
"""
self.stopped = True
def resume(self):
"""
Resume queue
"""
self.stopped = False
self.startNextSignal.emit()
class ProcessWorker(QObject):
"""
This class works with QProcess
"""
outputSignal = Signal(str) # output message
errorSignal = Signal(str) # error message
closedSignal = Signal(object) # exit code
def __init__(self, data):
super(ProcessWorker, self).__init__()
self.data = data
self.proc = None
if not self.data.get('cmd'):
self.errorSignal.emit('Commend is empty')
return
# [p for p in re.split("( |\\\".*?\\\"|'.*?')", c) if p.strip()]
self.name = self.data.get('name', os.path.basename(shlex.split(self.data.get('cmd', ''))[0]))
def __repr__(self):
return ('<ProcessWorker #%s "%s">' % (self.data.get('__uuid'), self.name or 'noname')).strip()
def start(self):
"""
Main start method
"""
self.proc = QProcess()
self.proc.setProcessChannelMode(QProcess.MergedChannels)
self.proc.readyRead.connect(self.send_output)
self.proc.finished.connect(self.send_finished)
self.proc.error.connect(self.errorSignal.emit)
env = self.data.get('env')
if isinstance(env, dict) and env:
qenv = QProcessEnvironment.systemEnvironment()
for k, v in env.items():
qenv.insert(k, v)
self.proc.setProcessEnvironment(qenv)
args = self.data.get('args', [])
if not isinstance(args, list):
logger.error('Args should be list type')
args = []
logger.debug('COMMAND: %s' % ' '.join([self.data.get('cmd')]+[str(x) for x in args]))
self.proc.start(self.data.get('cmd'), [str(x) for x in args])
def send_output(self):
"""
Catch process output
"""
output = self.proc.readAll()
if not isinstance(output, str):
if sys.version_info[0] < 3:
output = str(output)
else:
output = str(output, 'utf-8')
output = str(output).strip()
if output:
# send
self.outputSignal.emit(output)
def stop(self):
"""
Stop process
"""
if self.proc.isOpen():
self.proc.kill()
self.proc.terminate()
def send_finished(self, code):
"""
Send signal on process stopped
:param code: exit code
"""
self.closedSignal.emit(code)
if __name__ == '__main__':
# executable text script
import sys, json, time
def msg(text):
sys.stdout.write('\n'+str(text))
sys.stdout.flush()
msg('Start process')
for i in range(0, 100, 20):
time.sleep(0.5)
msg('Progress: {}%'.format(i))
if i > 50:
if 'error' in sys.argv:
sys.exit(3)
result = dict(
value1=1,
value2=2
)
msg('Result:::{}:::'.format(json.dumps(result)))
msg('Complete')
######################### TESTS ############################
if False:
import process_control
import random
from PySide.QtCore import QTimer
PROCESS = ProcessControl()
def callback(v, *args):
name = ([k for k in PROCESS.__dict__.keys() if PROCESS.__dict__[k] == v] or [v])[0]
print 'CALLBACK "%s" >> %s' % (name, str(args))
for sig in [
PROCESS.processCreatedSignal,
PROCESS.processOutputSignal,
PROCESS.processStartedSignal,
PROCESS.processResultSignal,
PROCESS.progressSignal,
PROCESS.processErrorSignal,
PROCESS.processFinishedSignal,
PROCESS.queueEmptySignal
]:
sig.connect(lambda signal=sig, *args: callback(sig, *args))
cmd = sys.executable
args1 = [os.path.normpath(process_control.__file__)]
args2 = [os.path.normpath(process_control.__file__), 'error']
env = {'PYTHONPATH': os.getcwd()}
PROCESS.set_max_process_count(2)
for i in range(10):
PROCESS.add_task(cmd,
args=random.choice([args1, args2]),
name='Test {}'.format(i),
env=env,
workdir=os.path.dirname(os.path.dirname(process_control.__file__))
)
time.sleep(0.3)
# test with crash
# QTimer.singleShot(7000, sys.exit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment