Skip to content

Instantly share code, notes, and snippets.

@pcapriotti
Created July 16, 2011 08:54
Show Gist options
  • Save pcapriotti/1086172 to your computer and use it in GitHub Desktop.
Save pcapriotti/1086172 to your computer and use it in GitHub Desktop.
Reactor Framework
"""The reactor framework.
This module introduces the *reactor framework*, a collection of utilities to be
used in conjunction with the greelet library to solve the problem of inversion
of control in event-driven code.
Traditionally, writing event-driven code typically consists of "connecting"
signals to handlers (i.e. callbacks), which are to be invoked by the framework
in use when a certain "event" occurs.
As long as the logic being implemented is simple enough to be easily expressed
in the form of "reactions" to specified events, this model is pretty natural
and straightforward to implement. An example is an application displaying
several buttons, each of which performs a different non-interactive operation.
However, more often than not, the desired reaction to user input depends not
only on the input itself, but also on whatever input was already provided,
either by the user, or by some other previous operations that is now concluded.
In that case, the logic detailing the desired behavior can usually be easily
expressed in a linear fashion, but that doesn't directly translate into the
traditional event-driven model, and it often requires a translation into the
equivalent of a state machine representation or something to that effect, to be
able to be implemented correctly and reliably.
For example, suppose we want to implement a button which will display a message
only after it has been clicked 5 times, and do nothing otherwise.
In the usual event-driven style of programming, we need to introduce a global
state containing the number of times the button has been clicked, increment it,
and check if it reached the desired count at each invocation of the handler.
Using the Qt API as an example, the handler and initialization function would
probably look like this:
def __init__(self):
self.count = 0
self.button.clicked.connect(self.on_button_clicked)
def on_button_clicked(self):
self.count += 1
if self.count == 5:
print "time is a face on the water"
self.button.clicked.disconnect(on_button_clicked)
This is a simple enough example that it doesn't look too bad, but still it's
easy to spot the signs that make such code so hard to follow and maintain when
the complexity is increased even so slightly.
It has all the components of a straightforward for loop (initializing a
counter, incrementing it at each step, checking for termination), but the
iteration itself is hidden in the framework main loop, and we cannot just use
the normal python iteration techniques, because the execution of the loop is
not contained within a single function frame, but is spread throughout multiple
function invocations.
The reactor framework solves this problem with the use of greenlet "threads".
When you use wait_for to connect to a signal, for example, control is
immediately returned to the calling function, but resumed to the point where it
left as soon as the signal is emitted.
This makes the code look like a blocking wait on the signal but without any of
the disadvantages, like actually blocking the main loop, resulting in a frozen
GUI.
For comparison, here is how the previous example could have been implemented
using the reactor framework:
def __init__(self):
for i in xrange(5):
wait_for(self.button.clicked)
print "time is a face on the water"
The reactor framework is inspired by similar libraries available for other
languages and frameworks [1] [2], as well as a number of research articles on
the subject of solving the inversion of control problem with cooperative
multitasking [3] [4].
References:
[1] http://lamp.epfl.ch/~imaier/
[2] http://thesynchronousblog.files.wordpress.com/2009/08/luagravity_sblp.pdf
[3] http://lamp.epfl.ch/~phaller/doc/haller06jmlc.pdf
[4] http://www.stanford.edu/class/cs240/readings/usenix2002-fibers.pdf
"""
from greenlet import greenlet, getcurrent as gself
from decorator import decorator
def react(f):
"""Run the given function f in a new greenlet.
Convenience function to start a new reactive context.
Since wait_for or reactive_handler cannot be used inside the same greenlet
as the main loop, a basic use of this function is to execute the
application main routine, so that it run in its own greenlet, and is able
to take advantage of reactive event handling.
More generally, spawning a new greenlet is useful to make sure handling of
certain signals is performed independently of others.
For example, a code like the following:
wait_for(button1.clicked)
print "button1 clicked"
wait_for(button2.clicked)
print "button2 clicked"
will only react to presses of button1 and button2 in that specific order, whereas
wait_for(button1.clicked, button2.clicked)
print "button1 or button2 clicked"
will react equally to a click on either of the buttons. If we want to react
to buttons in either order, we can use something like the following:
@reactive
def handle_button(button, name):
wait_for(button.clicked)
print name, "clicked"
handle_button(button1, "button1")
handle_button(button2, "button2")
"""
greenlet(f).switch()
@decorator
def reactive(f, *args):
"""Modify a function to spawn a new greenlet when called.
Whenever the wrapped function is called, a new greenlet is spawned and the
function is executed in it.
Note that by using this decorator, you lose the ability to access the
return value of the wrapped function.
"""
react(lambda: f(*args))
def wait_for(*signals, **kwargs):
"""Block the current greenlet on the given signals.
This function is used to interrupt the current greenlet execution until one
of the specified signals is emitted. When that happens, execution is
resumed immediately after the function call, which returns
It is possible to specify a list of extra signals that result in
exceptions, by adding a named parameter 'exceptions' containing a list of
tuples of the form
(signal_name, exception_class).
If a signal contained in that list is emitted first, the corresponding
exception class is instantiated using the signal arguments as parameters,
and raised in the greenlet of the caller after it resumes.
"""
w = waiting_for(*signals, **kwargs)
with w:
pass
return normalize_result(w.result)
class waiting_for(object):
"""A variation of wait_for to be used in a with statement.
Sometimes, we want to make sure we are listening to a certain signal before
performing an operation, but wait_for doesn't allow the possibily of
executing code between when the signal is connected, and when it is
emitted.
This limitation is addressed by using waiting_for inside a with statement.
For example:
with waiting_for(message.sent):
send(message)
print "message sent"
would set up a handler for the message.sent signal, then execute the code
inside the with statement (i.e. send the message), and finally resume
execution of the parent greenlet (e.g. the application main loop).
Eventually, When the signal is emitted, the print statement is executed.
As with wait_for, optional 'exceptional' signals can be specified, that
will result in a raised exception when emitted.
"""
def __init__(self, *signals, **kwargs):
self.signals = signals
self.exceptions = kwargs.get("exceptions", [])
self.result = None
self.exception = None
def __enter__(self):
self.cc = gself()
for signal in self.signals:
signal.connect(self.handler)
for signal, exc in self.exceptions:
signal.connect(self.exception_handler)
return self
def __exit__(self, type, value, traceback):
self.cc.parent.switch()
if not self.exception is None:
raise self.exception
def disconnect_all(self):
for signal in self.signals:
signal.disconnect(self.handler)
for signal, _ in self.exceptions:
signal.disconnect(self.exception_handler)
def handler(self, *args):
self.disconnect_all()
self.result = normalize_result(args)
self.cc.parent = gself()
self.cc.switch()
def exception_handler(self, *args):
self.disconnect_all()
self.exception = exc(*args)
def events(signal):
"""Keep handling a signal indefinitely.
A common signal handling scenario is the need to perform some operation
every time the signal is emitted.
This is achieved normally without the use of reactor framework by simply
connecting the signal to a handler performing the desired operation.
However, there is a simple way to achieve the same result within the
reactor framework by iterating through the generator returned by the events
function as though they were all already available. The reactor framework
takes care of returning control to the main loop after each iteration, and
resuming the loop whenever a new message is available.
There is a number of of reasons why this approach might be preferable to
the direct one:
- consistency
- read/write access to the local frame
- ability to yield from within the handler code
- ability to manipulate the yield generator via combinators or comprehension
For example, the following code:
def incoming_messages(self):
return (e.message for e in events(self.message_received))
will return a generator containing all future incoming messages.
"""
event_args = []
current = gself()
def handler(*args):
event_args[:] = args
current.parent = gself()
current.switch()
signal.connect(handler)
while True:
current.parent.switch()
yield normalize_result(event_args)
signal.disconnect(handler)
class reactive_handler(object):
"""Adaptor to be used as a callback to non-reactive asynchronous function calls.
Many frameworks and libraries expose functions that take one or more
callbacks as arguments, and use them to notify the caller when the
operation is finished and what the result is.
Using a reactive_handler instance inside a with statement allows such a
function to be called within the reactor framework. The function will
execute and return to the main loop, and execution will resume after the
call only when the handler is actually invoked.
The handler may be used multiple times within the with statement, but can
only be called from a different greenlet.
"""
def __init__(self, exception=None):
self.exception = exception
self.result = None
self.base = self
def __enter__(self):
self.cc = gself()
return self
def __exit__(self, type, value, traceback):
if type is None:
if self.cc.parent is None:
raise Exception("reactive_handler cannot be used from the root greenlet")
self.cc.parent.switch()
else:
return
if self.exception: raise self.exception(*self.result)
return False
def __call__(self, *args):
self.base.result = normalize_result(args)
self.base.exception = self.exception
if gself() == self.base.cc:
raise Exception("reactive_handler invoked in the same greenlet where it was created")
self.base.cc.parent = gself()
self.base.cc.switch()
"""Create a special handler which will result in a raised exception when invoked.
When a function takes a handler to be invoked in case of error, the handler
returned by this function can be used. The specified exception class will
be instantiated by the handler and raised after the greenlet is resumed.
"""
def for_error(self, exception=None):
result = self.__class__(exception)
result.base = self
return result
class SignalAdaptor(object):
"""Adaptor for dbus-like signals.
The signals passed to the reactor framework are assumed to have an interface compatible with that of Qt signals in PySide.
This adaptor class allows the use of dbus signals in the reactor framework.
"""
def __init__(self, obj, name):
self.obj = obj
self.name = name
self.connection = None
def connect(self, handler):
if self.connection:
raise Exception("SignalAdaptor does not support more than 1 connection")
self.connection = self.obj.connect_to_signal(self.name, handler)
def disconnect(self, handler):
if self.connection:
self.connection.remove()
def normalize_result(result):
if len(result) == 1:
return result[0]
elif len(result) == 0:
return None
else:
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment