Skip to content

Instantly share code, notes, and snippets.

@MaxMorais
Created August 7, 2017 12:55
Show Gist options
  • Save MaxMorais/ddbf4dd42e38772887f0d97d5a119842 to your computer and use it in GitHub Desktop.
Save MaxMorais/ddbf4dd42e38772887f0d97d5a119842 to your computer and use it in GitHub Desktop.
class Method( object ):
__slots__ = ('name', 'args', 'rt')
def __init__(self, name = None, args=(), rt=False):
super(Method, self).__init__()
self.name = None
self.args = args
self.rt = rt # Function returns value?
def __get__(self, instance, owner):
if instance is None:
return self
if not self.rt:
def wrapper(*args, **kwargs):
k = dict(zip(self.args, args))
k.update(**kwargs)
instance.get_app().execute(
"{id}.{alias}('{method}', {kwargs});".format(
id=instance.identifier,
alias=instance.alias,
method=self.name,
kwargs=json.dumps(k)
)
)
return wrapper
else:
return MethodResultWrapper(instance, self)
class MethodResultTimeout(Exception):
"""Raised when a result reach max timeout"""
class MethodResultWrapper( object ):
def __init__(self, widget, method ):
self.widget = widget
self.method = method
self.identifier = str(id(self))
self.future_result = Queue()
def _subscribe_to_topic(self):
self.widget.pubsub.create_topic(self.identifier)
self.widget.pubsub.subscribe(self.identifier, self._handle_data)
def _start_thread(self):
self.thread = Thread(
target=self._run,
kwargs={'future_result': self.future_result})
self.thread.start()
def _clientside_answer(self, args, kwargs):
k = dict(zip(self.method.args, args))
k.update(**kwargs)
s = "{id}.{alias}('{method}', {kwargs})".format(
id=self.widget.identifier,
alias=self.method.alias,
method=self.method.name,
kwargs=json.dumps(k)
)
s = "sendCallbackParams('{topic}', 'set_async_result', {'data': {s}});".format(
topic=topic,
s=s
)
instance.get_app().execute(js)
def _handle_data(self, data):
self.future_result.put(data)
def __call__(self, args, kwargs):
self._subscribe_to_topic()
self._start_thread()
self._clientside_answer(*args, **kwargs)
return self
def wait(self, timeout=.5):
self.thread.join(timeout)
if self.thread.isAlive():
raise MethodResultTimeout
result = self.future_result.get()
self.future_result.task_done()
return self.result
@staticmethod
def _run(future_result):
while future_result.isEmpty():
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment