Created
July 16, 2011 08:54
-
-
Save pcapriotti/1086172 to your computer and use it in GitHub Desktop.
Reactor Framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""The reactor framework. | |
This module introduces the *reactor framework*, a collection of utilities to be | |
used in conjunction with the greelet library to solve the problem of inversion | |
of control in event-driven code. | |
Traditionally, writing event-driven code typically consists of "connecting" | |
signals to handlers (i.e. callbacks), which are to be invoked by the framework | |
in use when a certain "event" occurs. | |
As long as the logic being implemented is simple enough to be easily expressed | |
in the form of "reactions" to specified events, this model is pretty natural | |
and straightforward to implement. An example is an application displaying | |
several buttons, each of which performs a different non-interactive operation. | |
However, more often than not, the desired reaction to user input depends not | |
only on the input itself, but also on whatever input was already provided, | |
either by the user, or by some other previous operations that is now concluded. | |
In that case, the logic detailing the desired behavior can usually be easily | |
expressed in a linear fashion, but that doesn't directly translate into the | |
traditional event-driven model, and it often requires a translation into the | |
equivalent of a state machine representation or something to that effect, to be | |
able to be implemented correctly and reliably. | |
For example, suppose we want to implement a button which will display a message | |
only after it has been clicked 5 times, and do nothing otherwise. | |
In the usual event-driven style of programming, we need to introduce a global | |
state containing the number of times the button has been clicked, increment it, | |
and check if it reached the desired count at each invocation of the handler. | |
Using the Qt API as an example, the handler and initialization function would | |
probably look like this: | |
def __init__(self): | |
self.count = 0 | |
self.button.clicked.connect(self.on_button_clicked) | |
def on_button_clicked(self): | |
self.count += 1 | |
if self.count == 5: | |
print "time is a face on the water" | |
self.button.clicked.disconnect(on_button_clicked) | |
This is a simple enough example that it doesn't look too bad, but still it's | |
easy to spot the signs that make such code so hard to follow and maintain when | |
the complexity is increased even so slightly. | |
It has all the components of a straightforward for loop (initializing a | |
counter, incrementing it at each step, checking for termination), but the | |
iteration itself is hidden in the framework main loop, and we cannot just use | |
the normal python iteration techniques, because the execution of the loop is | |
not contained within a single function frame, but is spread throughout multiple | |
function invocations. | |
The reactor framework solves this problem with the use of greenlet "threads". | |
When you use wait_for to connect to a signal, for example, control is | |
immediately returned to the calling function, but resumed to the point where it | |
left as soon as the signal is emitted. | |
This makes the code look like a blocking wait on the signal but without any of | |
the disadvantages, like actually blocking the main loop, resulting in a frozen | |
GUI. | |
For comparison, here is how the previous example could have been implemented | |
using the reactor framework: | |
def __init__(self): | |
for i in xrange(5): | |
wait_for(self.button.clicked) | |
print "time is a face on the water" | |
The reactor framework is inspired by similar libraries available for other | |
languages and frameworks [1] [2], as well as a number of research articles on | |
the subject of solving the inversion of control problem with cooperative | |
multitasking [3] [4]. | |
References: | |
[1] http://lamp.epfl.ch/~imaier/ | |
[2] http://thesynchronousblog.files.wordpress.com/2009/08/luagravity_sblp.pdf | |
[3] http://lamp.epfl.ch/~phaller/doc/haller06jmlc.pdf | |
[4] http://www.stanford.edu/class/cs240/readings/usenix2002-fibers.pdf | |
""" | |
from greenlet import greenlet, getcurrent as gself | |
from decorator import decorator | |
def react(f): | |
"""Run the given function f in a new greenlet. | |
Convenience function to start a new reactive context. | |
Since wait_for or reactive_handler cannot be used inside the same greenlet | |
as the main loop, a basic use of this function is to execute the | |
application main routine, so that it run in its own greenlet, and is able | |
to take advantage of reactive event handling. | |
More generally, spawning a new greenlet is useful to make sure handling of | |
certain signals is performed independently of others. | |
For example, a code like the following: | |
wait_for(button1.clicked) | |
print "button1 clicked" | |
wait_for(button2.clicked) | |
print "button2 clicked" | |
will only react to presses of button1 and button2 in that specific order, whereas | |
wait_for(button1.clicked, button2.clicked) | |
print "button1 or button2 clicked" | |
will react equally to a click on either of the buttons. If we want to react | |
to buttons in either order, we can use something like the following: | |
@reactive | |
def handle_button(button, name): | |
wait_for(button.clicked) | |
print name, "clicked" | |
handle_button(button1, "button1") | |
handle_button(button2, "button2") | |
""" | |
greenlet(f).switch() | |
@decorator | |
def reactive(f, *args): | |
"""Modify a function to spawn a new greenlet when called. | |
Whenever the wrapped function is called, a new greenlet is spawned and the | |
function is executed in it. | |
Note that by using this decorator, you lose the ability to access the | |
return value of the wrapped function. | |
""" | |
react(lambda: f(*args)) | |
def wait_for(*signals, **kwargs): | |
"""Block the current greenlet on the given signals. | |
This function is used to interrupt the current greenlet execution until one | |
of the specified signals is emitted. When that happens, execution is | |
resumed immediately after the function call, which returns | |
It is possible to specify a list of extra signals that result in | |
exceptions, by adding a named parameter 'exceptions' containing a list of | |
tuples of the form | |
(signal_name, exception_class). | |
If a signal contained in that list is emitted first, the corresponding | |
exception class is instantiated using the signal arguments as parameters, | |
and raised in the greenlet of the caller after it resumes. | |
""" | |
w = waiting_for(*signals, **kwargs) | |
with w: | |
pass | |
return normalize_result(w.result) | |
class waiting_for(object): | |
"""A variation of wait_for to be used in a with statement. | |
Sometimes, we want to make sure we are listening to a certain signal before | |
performing an operation, but wait_for doesn't allow the possibily of | |
executing code between when the signal is connected, and when it is | |
emitted. | |
This limitation is addressed by using waiting_for inside a with statement. | |
For example: | |
with waiting_for(message.sent): | |
send(message) | |
print "message sent" | |
would set up a handler for the message.sent signal, then execute the code | |
inside the with statement (i.e. send the message), and finally resume | |
execution of the parent greenlet (e.g. the application main loop). | |
Eventually, When the signal is emitted, the print statement is executed. | |
As with wait_for, optional 'exceptional' signals can be specified, that | |
will result in a raised exception when emitted. | |
""" | |
def __init__(self, *signals, **kwargs): | |
self.signals = signals | |
self.exceptions = kwargs.get("exceptions", []) | |
self.result = None | |
self.exception = None | |
def __enter__(self): | |
self.cc = gself() | |
for signal in self.signals: | |
signal.connect(self.handler) | |
for signal, exc in self.exceptions: | |
signal.connect(self.exception_handler) | |
return self | |
def __exit__(self, type, value, traceback): | |
self.cc.parent.switch() | |
if not self.exception is None: | |
raise self.exception | |
def disconnect_all(self): | |
for signal in self.signals: | |
signal.disconnect(self.handler) | |
for signal, _ in self.exceptions: | |
signal.disconnect(self.exception_handler) | |
def handler(self, *args): | |
self.disconnect_all() | |
self.result = normalize_result(args) | |
self.cc.parent = gself() | |
self.cc.switch() | |
def exception_handler(self, *args): | |
self.disconnect_all() | |
self.exception = exc(*args) | |
def events(signal): | |
"""Keep handling a signal indefinitely. | |
A common signal handling scenario is the need to perform some operation | |
every time the signal is emitted. | |
This is achieved normally without the use of reactor framework by simply | |
connecting the signal to a handler performing the desired operation. | |
However, there is a simple way to achieve the same result within the | |
reactor framework by iterating through the generator returned by the events | |
function as though they were all already available. The reactor framework | |
takes care of returning control to the main loop after each iteration, and | |
resuming the loop whenever a new message is available. | |
There is a number of of reasons why this approach might be preferable to | |
the direct one: | |
- consistency | |
- read/write access to the local frame | |
- ability to yield from within the handler code | |
- ability to manipulate the yield generator via combinators or comprehension | |
For example, the following code: | |
def incoming_messages(self): | |
return (e.message for e in events(self.message_received)) | |
will return a generator containing all future incoming messages. | |
""" | |
event_args = [] | |
current = gself() | |
def handler(*args): | |
event_args[:] = args | |
current.parent = gself() | |
current.switch() | |
signal.connect(handler) | |
while True: | |
current.parent.switch() | |
yield normalize_result(event_args) | |
signal.disconnect(handler) | |
class reactive_handler(object): | |
"""Adaptor to be used as a callback to non-reactive asynchronous function calls. | |
Many frameworks and libraries expose functions that take one or more | |
callbacks as arguments, and use them to notify the caller when the | |
operation is finished and what the result is. | |
Using a reactive_handler instance inside a with statement allows such a | |
function to be called within the reactor framework. The function will | |
execute and return to the main loop, and execution will resume after the | |
call only when the handler is actually invoked. | |
The handler may be used multiple times within the with statement, but can | |
only be called from a different greenlet. | |
""" | |
def __init__(self, exception=None): | |
self.exception = exception | |
self.result = None | |
self.base = self | |
def __enter__(self): | |
self.cc = gself() | |
return self | |
def __exit__(self, type, value, traceback): | |
if type is None: | |
if self.cc.parent is None: | |
raise Exception("reactive_handler cannot be used from the root greenlet") | |
self.cc.parent.switch() | |
else: | |
return | |
if self.exception: raise self.exception(*self.result) | |
return False | |
def __call__(self, *args): | |
self.base.result = normalize_result(args) | |
self.base.exception = self.exception | |
if gself() == self.base.cc: | |
raise Exception("reactive_handler invoked in the same greenlet where it was created") | |
self.base.cc.parent = gself() | |
self.base.cc.switch() | |
"""Create a special handler which will result in a raised exception when invoked. | |
When a function takes a handler to be invoked in case of error, the handler | |
returned by this function can be used. The specified exception class will | |
be instantiated by the handler and raised after the greenlet is resumed. | |
""" | |
def for_error(self, exception=None): | |
result = self.__class__(exception) | |
result.base = self | |
return result | |
class SignalAdaptor(object): | |
"""Adaptor for dbus-like signals. | |
The signals passed to the reactor framework are assumed to have an interface compatible with that of Qt signals in PySide. | |
This adaptor class allows the use of dbus signals in the reactor framework. | |
""" | |
def __init__(self, obj, name): | |
self.obj = obj | |
self.name = name | |
self.connection = None | |
def connect(self, handler): | |
if self.connection: | |
raise Exception("SignalAdaptor does not support more than 1 connection") | |
self.connection = self.obj.connect_to_signal(self.name, handler) | |
def disconnect(self, handler): | |
if self.connection: | |
self.connection.remove() | |
def normalize_result(result): | |
if len(result) == 1: | |
return result[0] | |
elif len(result) == 0: | |
return None | |
else: | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment