Last active
December 18, 2017 09:03
-
-
Save paulwinex/82c8380b91d8095ed81ed32e29ef8966 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This class allow you call function in thread and receive many signals about progress | |
""" | |
from PySide.QtCore import * | |
import time, hashlib, random | |
import logging as _logging | |
logger = _logging.getLogger(__name__) | |
class ThreadControl(QObject): | |
""" | |
Main thread control class | |
""" | |
# ################################ SIGNALS | |
# tasks | |
task_createdSignal = Signal(object, object) # task id, task data | |
task_startedSignal = Signal(object, object) # task id, task data | |
task_progressSignal = Signal(object, object) # task id, progress data | |
task_finishedSignal = Signal(object, object) # task id, task data | |
task_errorSignal = Signal(object, object) # task id, message | |
pool_emptySignal = Signal() | |
def __init__(self, max_threads=3): | |
super(ThreadControl, self).__init__() | |
self.max_threads = max_threads | |
# workers | |
self._thread_workers = [] | |
self._pool = [] | |
def __generate_uid(self, data): | |
""" | |
Create unique id from data + random | |
:param data: object | |
:return: uuid | |
""" | |
return hashlib.md5(''.join([str(data), str(random.random())])).hexdigest() | |
def add_task(self, callable_or_string, *args, **kwargs): | |
""" | |
Add new task to pool | |
:param callable_or_string: callable function. | |
Should be accept any **kwargs!!! | |
:param args: [] | |
:param kwargs: {} | |
:return: str (task UUID) | |
""" | |
data = dict( | |
callable=callable_or_string, | |
args=args, | |
kwargs=kwargs, | |
) | |
data['__uuid'] = self.__generate_uid(data) | |
uuid = self.thread_execute(data) | |
return uuid | |
def thread_execute(self, data): | |
""" | |
Add task from raw data | |
:param data: {} | |
:return: | |
""" | |
if not callable(data.get('callable')) and not isinstance(data.get('callable'), (str, unicode)): | |
raise Exception('Callable should be function or name of function in module') | |
self._pool.append(data) | |
if not data.get('__uuid'): | |
data['__uuid'] = self.__generate_uid(data) | |
self.task_createdSignal.emit(data['__uuid'], data) | |
self._start_next_thread() | |
return data['__uuid'] | |
def _start_next_thread(self): | |
""" | |
Start next thread | |
""" | |
# check pool is empty | |
if not self._pool: | |
self.pool_emptySignal.emit() | |
return | |
# check threads limit | |
if len(self._thread_workers) >= self.max_threads: | |
logger.debug('Worker count limit') | |
return | |
# get data | |
data = self._pool.pop(0) | |
# check uuid | |
uuid = data.get('__uuid') | |
if not uuid: | |
self.task_errorSignal.emit('none', 'Task have not UUID: %s' % data) | |
logger.error('Task have not UUID: %s' % data) | |
self._start_next_thread() | |
return | |
# check callable exists | |
if not data.get('callable'): | |
self.task_errorSignal.emit(uuid, 'Callable not set') | |
logger.error('Method not set') | |
self._start_next_thread() | |
return | |
# start new thread | |
t = ThreadExecutor(data) | |
self._thread_workers.append(t) | |
# connect signals | |
t.progressSignal.connect(lambda prg, uid=uuid: self.thread_progress(uid, prg)) | |
t.errorSignal.connect(lambda msg, wrk=t, uid=uuid: self.thread_error(uid, wrk, msg)) | |
t.finishedSignal.connect(lambda res, wrk=t, uid=uuid: self.thread_finished(uid, res, wrk)) | |
t.threadClosedSignal.connect(lambda wrk=t: self.delete_worker(t)) | |
# start | |
t.start() | |
self.task_startedSignal.emit(uuid, data) | |
# start next | |
self._start_next_thread() | |
def delete_worker(self, worker): | |
""" | |
Delete worker on thread terminated signal | |
""" | |
if worker in self._thread_workers: | |
self._thread_workers.remove(worker) | |
self._start_next_thread() | |
def thread_progress(self, uuid, data): | |
""" | |
Emit progress data | |
{ | |
progress: int, # 0-100% | |
activity: str, # init, render, cleanup etc | |
message: str # current operation or something else | |
complete: bool # task is complete even if it does not 100% | |
force: bool # Ignore timeout limit | |
} | |
:param uuid: task uuid | |
:param data: {} | |
""" | |
self.task_progressSignal.emit(uuid, data) | |
def thread_finished(self, uuid, result, worker): | |
""" | |
Send result of function | |
:param uuid: completed task uuid | |
:param result: result of task | |
:param worker: worker object | |
""" | |
self.task_finishedSignal.emit(uuid, result) | |
self._start_next_thread() | |
def thread_error(self, uuid, worker, msg): | |
""" | |
Send error | |
:param uuid: ID of task | |
:param worker: worker | |
:param msg: error text | |
""" | |
self.task_errorSignal.emit(uuid, msg) | |
class ThreadExecutor(QObject): | |
""" | |
This class starts thread | |
""" | |
progressSignal = Signal(object) | |
errorSignal = Signal(object) | |
finishedSignal = Signal(object) | |
threadClosedSignal = Signal() | |
def __repr__(self): | |
return '<ThreadExecutor (%s)>' % self.content.get('__uuid', '-') | |
def __init__(self, content): | |
super(ThreadExecutor, self).__init__() | |
self.content = content | |
self.t = self.w = None | |
def start(self): | |
""" | |
Start callable in thread | |
""" | |
self.t = QThread(self) | |
self.t.terminated.connect(self.threadClosedSignal.emit) | |
self.w = ThreadWorker(self.content) | |
self.w.moveToThread(self.t) | |
self.t.started.connect(self.w.execute_method) | |
self.w.errorSignal.connect(self.error) | |
self.w.finishedSignal.connect(self.finish) | |
self.w.workerProgressSignal.connect(self.progressSignal.emit) | |
self.t.start() | |
def stop(self): | |
""" | |
Stop thread | |
""" | |
self.t.quit() | |
self.t.terminate() | |
def error(self, msg): | |
self.stop() | |
self.errorSignal.emit(msg) | |
def finish(self, result): | |
self.stop() | |
self.finishedSignal.emit(result) | |
class ThreadWorker(QObject): | |
""" | |
this class execute main callable of task | |
""" | |
workerProgressSignal = Signal(object) | |
errorSignal = Signal(object) | |
finishedSignal = Signal(object) | |
def __repr__(self): | |
return '<ThreadWorker (%s)>' % self.data.get('__uuid', '-') | |
def __init__(self, data): | |
super(ThreadWorker, self).__init__() | |
self.data = data | |
self.progress = Progress(self.data) | |
self.progress.progressSignal.connect(self.workerProgressSignal.emit) | |
self.last = 0 | |
def execute_method(self): | |
""" | |
Main execute function | |
:return: result or callable | |
""" | |
method = self.data.get('callable') | |
if not callable(method): | |
if not isinstance(method, (str, unicode)): | |
self.errorSignal.emit('Callable "%s" not found string or function') | |
return | |
method = self.load_method(method) | |
if not method: | |
self.errorSignal.emit('Callable method "%s" not found' % self.data.get('callable')) | |
return | |
try: | |
result = method(progress_callback=self.progress, *self.data.get('args', []), **self.data.get('kwargs', {})) | |
self.finishedSignal.emit(result) | |
except Exception as e: | |
self.errorSignal.emit(e.message) | |
@classmethod | |
def load_method(cls, name): | |
""" | |
Load module from string | |
:param name: path.to.module.function | |
:return: callable object | |
""" | |
# todo: not implemented yet | |
return None | |
class Progress(QObject): | |
""" | |
Use this object to send progress from function | |
""" | |
progressSignal = Signal(object) | |
# limit of sending per time | |
send_timeout_limit = 1 | |
def __init__(self, data): | |
super(Progress, self).__init__() | |
self.__data = data | |
self.last = 0 | |
def progress(self, val): | |
self.send(progress=val) | |
def activity(self, text): | |
self.send(activity=text) | |
def message(self, text): | |
self.send(message=text) | |
def complete(self, message=None): | |
self.send(complete=True, progress=100, message=message, force=True) | |
def send(self, progress=None, activity=None, message=None, complete=False, force=False): | |
""" | |
{ | |
progress: int, # 0-100% | |
activity: str, # init, render, cleanup etc | |
message: str # current operation or something else | |
complete: bool # task is complete even if it does not 100% | |
force: bool # Ignore timeout limit | |
} | |
:param progress: int | |
:param activity: str | |
:param message: str | |
:param complete: bool | |
:param force: bool | |
""" | |
# check timeout limit | |
curr = int(time.time()) | |
if curr < (self.last + self.send_timeout_limit) and not force: | |
return | |
# create data | |
progress_data = {} | |
if progress or complete: | |
if complete: | |
progress_data.update(dict( | |
progress=100 | |
)) | |
else: | |
progress_data.update(dict( | |
progress=progress | |
)) | |
if activity: | |
progress_data.update(dict( | |
activity=activity | |
)) | |
if message: | |
progress_data.update(dict( | |
message=message | |
)) | |
if progress_data: | |
# send | |
self.progressSignal.emit(progress_data) | |
# save last time | |
self.last = curr | |
############################################################################# | |
######### TESTS ############################################################ | |
############################################################################# | |
# ============================ Threads | |
import time, random | |
THREADS = ThreadControl() | |
def callback(v, *args): | |
name = ([k for k in THREADS.__dict__.keys() if THREADS.__dict__[k] == v] or [v])[0] | |
print 'CALLBACK "%s" >> %s' % (name, str(args)) | |
for sig in [ | |
THREADS.task_createdSignal, | |
THREADS.task_startedSignal, | |
THREADS.task_progressSignal, | |
THREADS.task_finishedSignal, | |
THREADS.task_errorSignal, | |
THREADS.pool_emptySignal | |
]: | |
sig.connect(lambda signal=sig, *args: callback(sig, *args)) | |
# simple | |
def simple_func(*args, **kwargs): | |
time.sleep(1) | |
THREADS.add_task(simple_func) | |
# return result | |
def func_with_result(*args, **kwargs): | |
time.sleep(1) | |
import datetime | |
t = datetime.datetime.now() | |
return {'time': t} | |
THREADS.add_task(func_with_result) | |
# function with arguments | |
def func_with_args(x, **kwargs): | |
t = random.randrange(4, 10) * 0.3 | |
time.sleep(t) | |
return 'TASK (%s) %s' % (t, x * 10) | |
THREADS.add_task(func_with_args, 2) | |
# queue | |
for i in range(THREADS.max_threads * 2): | |
THREADS.add_task(func_with_args, i) | |
# progress | |
def func_with_progress(*args, **kwargs): | |
pc = kwargs.get('progress_callback') | |
if pc: | |
pc.activity('Start progress...') | |
for i in range(0, 100, 20): | |
pc.progress(i) | |
time.sleep(1) | |
if i >= 80: | |
pc.message('Almost complete...') | |
pc.complete(message='Congratulations!') | |
return 100500 | |
else: | |
raise Exception('No progress callback') | |
THREADS.add_task(func_with_progress) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment