Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Last active December 18, 2017 09:03
Show Gist options
  • Save paulwinex/82c8380b91d8095ed81ed32e29ef8966 to your computer and use it in GitHub Desktop.
Save paulwinex/82c8380b91d8095ed81ed32e29ef8966 to your computer and use it in GitHub Desktop.
"""
This class allow you call function in thread and receive many signals about progress
"""
from PySide.QtCore import *
import time, hashlib, random
import logging as _logging
logger = _logging.getLogger(__name__)
class ThreadControl(QObject):
"""
Main thread control class
"""
# ################################ SIGNALS
# tasks
task_createdSignal = Signal(object, object) # task id, task data
task_startedSignal = Signal(object, object) # task id, task data
task_progressSignal = Signal(object, object) # task id, progress data
task_finishedSignal = Signal(object, object) # task id, task data
task_errorSignal = Signal(object, object) # task id, message
pool_emptySignal = Signal()
def __init__(self, max_threads=3):
super(ThreadControl, self).__init__()
self.max_threads = max_threads
# workers
self._thread_workers = []
self._pool = []
def __generate_uid(self, data):
"""
Create unique id from data + random
:param data: object
:return: uuid
"""
return hashlib.md5(''.join([str(data), str(random.random())])).hexdigest()
def add_task(self, callable_or_string, *args, **kwargs):
"""
Add new task to pool
:param callable_or_string: callable function.
Should be accept any **kwargs!!!
:param args: []
:param kwargs: {}
:return: str (task UUID)
"""
data = dict(
callable=callable_or_string,
args=args,
kwargs=kwargs,
)
data['__uuid'] = self.__generate_uid(data)
uuid = self.thread_execute(data)
return uuid
def thread_execute(self, data):
"""
Add task from raw data
:param data: {}
:return:
"""
if not callable(data.get('callable')) and not isinstance(data.get('callable'), (str, unicode)):
raise Exception('Callable should be function or name of function in module')
self._pool.append(data)
if not data.get('__uuid'):
data['__uuid'] = self.__generate_uid(data)
self.task_createdSignal.emit(data['__uuid'], data)
self._start_next_thread()
return data['__uuid']
def _start_next_thread(self):
"""
Start next thread
"""
# check pool is empty
if not self._pool:
self.pool_emptySignal.emit()
return
# check threads limit
if len(self._thread_workers) >= self.max_threads:
logger.debug('Worker count limit')
return
# get data
data = self._pool.pop(0)
# check uuid
uuid = data.get('__uuid')
if not uuid:
self.task_errorSignal.emit('none', 'Task have not UUID: %s' % data)
logger.error('Task have not UUID: %s' % data)
self._start_next_thread()
return
# check callable exists
if not data.get('callable'):
self.task_errorSignal.emit(uuid, 'Callable not set')
logger.error('Method not set')
self._start_next_thread()
return
# start new thread
t = ThreadExecutor(data)
self._thread_workers.append(t)
# connect signals
t.progressSignal.connect(lambda prg, uid=uuid: self.thread_progress(uid, prg))
t.errorSignal.connect(lambda msg, wrk=t, uid=uuid: self.thread_error(uid, wrk, msg))
t.finishedSignal.connect(lambda res, wrk=t, uid=uuid: self.thread_finished(uid, res, wrk))
t.threadClosedSignal.connect(lambda wrk=t: self.delete_worker(t))
# start
t.start()
self.task_startedSignal.emit(uuid, data)
# start next
self._start_next_thread()
def delete_worker(self, worker):
"""
Delete worker on thread terminated signal
"""
if worker in self._thread_workers:
self._thread_workers.remove(worker)
self._start_next_thread()
def thread_progress(self, uuid, data):
"""
Emit progress data
{
progress: int, # 0-100%
activity: str, # init, render, cleanup etc
message: str # current operation or something else
complete: bool # task is complete even if it does not 100%
force: bool # Ignore timeout limit
}
:param uuid: task uuid
:param data: {}
"""
self.task_progressSignal.emit(uuid, data)
def thread_finished(self, uuid, result, worker):
"""
Send result of function
:param uuid: completed task uuid
:param result: result of task
:param worker: worker object
"""
self.task_finishedSignal.emit(uuid, result)
self._start_next_thread()
def thread_error(self, uuid, worker, msg):
"""
Send error
:param uuid: ID of task
:param worker: worker
:param msg: error text
"""
self.task_errorSignal.emit(uuid, msg)
class ThreadExecutor(QObject):
"""
This class starts thread
"""
progressSignal = Signal(object)
errorSignal = Signal(object)
finishedSignal = Signal(object)
threadClosedSignal = Signal()
def __repr__(self):
return '<ThreadExecutor (%s)>' % self.content.get('__uuid', '-')
def __init__(self, content):
super(ThreadExecutor, self).__init__()
self.content = content
self.t = self.w = None
def start(self):
"""
Start callable in thread
"""
self.t = QThread(self)
self.t.terminated.connect(self.threadClosedSignal.emit)
self.w = ThreadWorker(self.content)
self.w.moveToThread(self.t)
self.t.started.connect(self.w.execute_method)
self.w.errorSignal.connect(self.error)
self.w.finishedSignal.connect(self.finish)
self.w.workerProgressSignal.connect(self.progressSignal.emit)
self.t.start()
def stop(self):
"""
Stop thread
"""
self.t.quit()
self.t.terminate()
def error(self, msg):
self.stop()
self.errorSignal.emit(msg)
def finish(self, result):
self.stop()
self.finishedSignal.emit(result)
class ThreadWorker(QObject):
"""
this class execute main callable of task
"""
workerProgressSignal = Signal(object)
errorSignal = Signal(object)
finishedSignal = Signal(object)
def __repr__(self):
return '<ThreadWorker (%s)>' % self.data.get('__uuid', '-')
def __init__(self, data):
super(ThreadWorker, self).__init__()
self.data = data
self.progress = Progress(self.data)
self.progress.progressSignal.connect(self.workerProgressSignal.emit)
self.last = 0
def execute_method(self):
"""
Main execute function
:return: result or callable
"""
method = self.data.get('callable')
if not callable(method):
if not isinstance(method, (str, unicode)):
self.errorSignal.emit('Callable "%s" not found string or function')
return
method = self.load_method(method)
if not method:
self.errorSignal.emit('Callable method "%s" not found' % self.data.get('callable'))
return
try:
result = method(progress_callback=self.progress, *self.data.get('args', []), **self.data.get('kwargs', {}))
self.finishedSignal.emit(result)
except Exception as e:
self.errorSignal.emit(e.message)
@classmethod
def load_method(cls, name):
"""
Load module from string
:param name: path.to.module.function
:return: callable object
"""
# todo: not implemented yet
return None
class Progress(QObject):
"""
Use this object to send progress from function
"""
progressSignal = Signal(object)
# limit of sending per time
send_timeout_limit = 1
def __init__(self, data):
super(Progress, self).__init__()
self.__data = data
self.last = 0
def progress(self, val):
self.send(progress=val)
def activity(self, text):
self.send(activity=text)
def message(self, text):
self.send(message=text)
def complete(self, message=None):
self.send(complete=True, progress=100, message=message, force=True)
def send(self, progress=None, activity=None, message=None, complete=False, force=False):
"""
{
progress: int, # 0-100%
activity: str, # init, render, cleanup etc
message: str # current operation or something else
complete: bool # task is complete even if it does not 100%
force: bool # Ignore timeout limit
}
:param progress: int
:param activity: str
:param message: str
:param complete: bool
:param force: bool
"""
# check timeout limit
curr = int(time.time())
if curr < (self.last + self.send_timeout_limit) and not force:
return
# create data
progress_data = {}
if progress or complete:
if complete:
progress_data.update(dict(
progress=100
))
else:
progress_data.update(dict(
progress=progress
))
if activity:
progress_data.update(dict(
activity=activity
))
if message:
progress_data.update(dict(
message=message
))
if progress_data:
# send
self.progressSignal.emit(progress_data)
# save last time
self.last = curr
#############################################################################
######### TESTS ############################################################
#############################################################################
# ============================ Threads
import time, random
THREADS = ThreadControl()
def callback(v, *args):
name = ([k for k in THREADS.__dict__.keys() if THREADS.__dict__[k] == v] or [v])[0]
print 'CALLBACK "%s" >> %s' % (name, str(args))
for sig in [
THREADS.task_createdSignal,
THREADS.task_startedSignal,
THREADS.task_progressSignal,
THREADS.task_finishedSignal,
THREADS.task_errorSignal,
THREADS.pool_emptySignal
]:
sig.connect(lambda signal=sig, *args: callback(sig, *args))
# simple
def simple_func(*args, **kwargs):
time.sleep(1)
THREADS.add_task(simple_func)
# return result
def func_with_result(*args, **kwargs):
time.sleep(1)
import datetime
t = datetime.datetime.now()
return {'time': t}
THREADS.add_task(func_with_result)
# function with arguments
def func_with_args(x, **kwargs):
t = random.randrange(4, 10) * 0.3
time.sleep(t)
return 'TASK (%s) %s' % (t, x * 10)
THREADS.add_task(func_with_args, 2)
# queue
for i in range(THREADS.max_threads * 2):
THREADS.add_task(func_with_args, i)
# progress
def func_with_progress(*args, **kwargs):
pc = kwargs.get('progress_callback')
if pc:
pc.activity('Start progress...')
for i in range(0, 100, 20):
pc.progress(i)
time.sleep(1)
if i >= 80:
pc.message('Almost complete...')
pc.complete(message='Congratulations!')
return 100500
else:
raise Exception('No progress callback')
THREADS.add_task(func_with_progress)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment