Created
September 29, 2017 10:09
-
-
Save lanfon72/b968b50ac5f93503736494fb5fd791bd to your computer and use it in GitHub Desktop.
observer for websocket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from weakref import WeakSet | |
from json import dumps, loads | |
def _validator(dict_data): | |
""" Default validator function for Observer class. """ | |
return dict_data['event'], dict_data['data'] | |
class BaseRoom: | |
""" This class is a base class for inheritance. """ | |
#: The class that is used to create observer object to observe a websocket. | |
observer_class = None | |
def __init__(self): | |
#: Use to weak refer all websockets that joined. | |
self.mates = WeakSet() | |
#: Holds the event as key, async function as listener in value set. | |
self._subscribes = dict() | |
#: Holds the Exception type as key, async function as handler in value. | |
self._except_hdlr = dict() | |
def listeners(self, event): | |
""" The event dispatcher. """ | |
return self._subscribes.get(event, set()) | |
def except_handler(self, exc, default=None): | |
""" The exception dispatcher. """ | |
return self._except_hdlr.get(exc, default or self.noop) | |
async def noop(self, ob, exception): | |
""" The default function to handle exceptions. """ | |
class Observer: | |
""" The observer object implements a event dispatcher binds with websocket. | |
Usually, it is living with another `Room` object is derived by `BaseRoom` | |
class. | |
.. admonition:: What if I need to use this class alone ? | |
The class is designed to be created in a `Room` object, if you really | |
need to use it alone, to remember register events in the `room` class | |
attribute. | |
:param ws: the websocket to binds with. | |
:param cfg: the configuration dict from web server application. | |
:param session: the session dict for this connection. An empty dict will be | |
created by default. | |
:param validator: An callable function with a dict-like input, and return a | |
tuple (event_name, event_data). Defaults to access dict | |
key "event" and "data". | |
""" | |
#: The default `room` object which depends on. | |
room = BaseRoom() | |
def __init__(self, ws, cfg, session=None, validator=None): | |
#: Holds the output messages. | |
self.tasks = asyncio.Queue() | |
#: Use as DI to let JSON dumps/loads replace-able. | |
self.dumps, self.loads = dumps, loads | |
self.ws, self.cfg = ws, cfg | |
self.session = session or dict() | |
self.validator = validator or _validator | |
async def trigger(self, event, data): | |
""" To trigger an event with the data received. All the exceptions that | |
uncatched will be propagated for the handler which registerd in room. | |
:param event: The event name as string. | |
:param data: The data as dict that be propogated. | |
""" | |
fns = self.room.listeners(event) | |
futs = [f(self, data) for f in fns] | |
for e in await asyncio.gather(*futs, return_exceptions=True): | |
if isinstance(e, BaseException): | |
hdlr = self.room.except_handler(type(e)) | |
asyncio.ensure_future(hdlr(self, e)) | |
async def emit(self, event, *args, **kws): | |
""" To publish result json to client with specific `event`. `args` will | |
be added into the key `args` in `kws`. For example:: | |
await ob.emit("greeting", "hello", success=True) | |
Is equivalent to the following:: | |
await ob.emit("greeting", args=["hello"], success=True) | |
And Client will received the data like:: | |
{"event": "greeting", "data": {"args": ["hello"], "success": true}} | |
:param event: The event name as string. | |
:param args: The values that JSON serializable. added to "args" in `kws`. | |
:param kws: The key/value should be serializable in JSON. | |
""" | |
kws['args'] = kws.get('args', []).extend(args) | |
data = self.dumps(dict(event=event, data=kws)) | |
await self.tasks.put(data) | |
async def serve(self): | |
""" To start communcating with client, this function will block until | |
websocket closed. | |
""" | |
tasks = [self._reader(), self._writer()] | |
_, d = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
[_.cancel() for _ in d] | |
async def _reader(self): | |
ws = self.ws | |
while ws.open: | |
try: | |
data = await ws.recv() | |
msg = self.loads(data) | |
evt, data = self.validator(msg) | |
coro = self.trigger(evt, data) | |
asyncio.ensure_future(coro) | |
except Exception as e: | |
hdlr = self.room.except_handler(type(e)) | |
asyncio.ensure_future(hdlr(self, e)) | |
async def _writer(self): | |
ws = self.ws | |
while ws.open: | |
try: | |
data = await self.tasks.get() | |
asyncio.ensure_future(ws.send(data)) | |
except Exception as e: | |
hdlr = self.room.except_handler(type(e)) | |
asyncio.ensure_future(hdlr(self, e)) | |
class Room(BaseRoom): | |
""" A minimal implements for BaseRoom class. Implemented methods to register | |
events and exceptions. | |
.. admonition:: Change the Default Observer Class | |
In default, we bind with :class:`Observer` to create observer object for | |
connect with websocket, you can re-assign another class (that should be | |
derived from :class:`Observer`) for :attr:`observer_class` to change the | |
default. | |
If you just want to add another kind of observer object to Room object, | |
consider to use :meth:`add_roommate` instead. | |
""" | |
#: Use Observer as the default Observer class. | |
observer_class = Observer | |
def on(self, event): | |
""" A decorator that is used to register an async function for a given | |
event name. The async function will receive observer object as the 1st | |
argument, and the data from client as 2nd argument. An event allowed to | |
register for multiple functions. | |
It does the same thing as :meth:`subscribe` but is intented for decorator | |
usage:: | |
@room.on("greeting") | |
async def greeting(ob, data): | |
await ob.emit("greeting", "hello there") | |
:param event: The event name as string. | |
""" | |
def decorator(cb): | |
self.subscribe(event, cb) | |
return cb | |
return decorator | |
def on_exception(self, exception): | |
""" A decorator that is used to register an async function for a given | |
exception class. The async function will receive observer object as the | |
1st argument, and the data from client as 2nd argument. It does the | |
same thing as :meth:`register_exception` but is intented for decorator | |
usage:: | |
@room.on_exception(IndexError) | |
async def index_error(ob, exc): | |
await ob.emit("Error", "An Exception catched.", repr(exc)) | |
:param exception: The exception class to register. | |
""" | |
def decorator(cb): | |
self.register_exception(exception, cb) | |
return cb | |
return decorator | |
def make_observer(self, websocket, *args, **kws): | |
""" Binds a websocket to a observer object that is an instance of | |
:attr:`observer_class`. | |
:param websocket: The websocket need to be binds. | |
:param args, kws: position/keyword arguments that delegate for | |
:attr:`observer_class`. | |
:return: a new observer object created via :attr:`observer_class`. | |
""" | |
ob = self.observer_class(websocket, *args, **kws) | |
return ob | |
def add_roommate(self, observer): | |
""" Registers and binds a observer object to this room. | |
:param observer: An observer object or the same, has to be an instance of | |
:class:`Observer`. | |
""" | |
assert isinstance(observer, Observer) | |
observer.room = self | |
self.mates.add(observer) | |
def join(self, ws, *args, **kws): | |
""" Registers a connected websocket to this room, and return a observer | |
object for manipulate. | |
:param ws: The connected websocket. | |
:param args: Position arguments that delegate to observer class. | |
:param kws: Keyword-only arguments that delegate to observer class. | |
:return: a new observer object created via :attr:`observer_class`. | |
""" | |
ob = self.make_observer(ws, *args, **kws) | |
self.add_roommate(ob) | |
return ob | |
def subscribe(self, evt, cb): | |
""" Registers a event to invoke the async callback function. The async | |
function will receive the observer object as the 1st argument, and the | |
data from client as 2nd argument. An event allowed to register with | |
multiple functions. | |
Works exactly like the :meth:`on` decorator. | |
For example:: | |
@room.on("greeting") | |
async def greeting(ob, data): | |
await ob.emit("greeting", "hello there") | |
Is equivalent to the following:: | |
async def greeting(ob, data): | |
await ob.emit("greeting", "hello there") | |
room.subscribe("greeting", greeting) | |
:param evt: The event name as string. | |
:param cb: The async function as a callback that accept two arguments: | |
observer object and data. | |
""" | |
assert callable(cb) | |
fns = self._subscribes.get(evt, set()) | |
fns.add(cb) | |
self._subscribes[evt] = fns | |
def register_exception(self, exc, cb): | |
""" Registers a exception class to invoke the async callback function. | |
The function will receive the observer object as the 1st argument, and | |
the data from client as 2nd argument. | |
Works exactly like the :meth:`on_exception` decorator. | |
For example:: | |
@room.on_exception(IndexError) | |
async def index_error(ob, exc): | |
await ob.emit("Error", "An Exception catched.", repr(exc)) | |
Is equivalent to the following:: | |
async def index_error(ob, exc): | |
await ob.emit("Error", "An Exception catched.", repr(exc)) | |
room.register_exception(IndexError, index_error) | |
:param exc: The exception class to register. | |
:param cb: The async function as a callback that accept two arguments: | |
observer object and data. | |
""" | |
assert callable(cb) | |
self._except_hdlr[exc] = cb | |
def broadcast(self, caller, evt, data): | |
""" Broadcast the event and data to all mates in this room except the | |
caller. This is not an async function but return a Future to be await. | |
:param caller: An observer object or the same, has to be an instance of | |
:class:`Observer`. | |
:param evt: The event name as string. | |
:param data: The data that propagate to functions. | |
:return: An pending Future from `asyncio.gather` for manipulate. | |
""" | |
assert isinstance(caller, Observer) | |
futs = [m.trigger(evt, data) for m in (self.mates - {caller})] | |
return asyncio.gather(*futs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment