Skip to content

Instantly share code, notes, and snippets.

@lanfon72
Created September 29, 2017 10:09
Show Gist options
  • Save lanfon72/b968b50ac5f93503736494fb5fd791bd to your computer and use it in GitHub Desktop.
Save lanfon72/b968b50ac5f93503736494fb5fd791bd to your computer and use it in GitHub Desktop.
observer for websocket.
import asyncio
from weakref import WeakSet
from json import dumps, loads
def _validator(dict_data):
""" Default validator function for Observer class. """
return dict_data['event'], dict_data['data']
class BaseRoom:
""" This class is a base class for inheritance. """
#: The class that is used to create observer object to observe a websocket.
observer_class = None
def __init__(self):
#: Use to weak refer all websockets that joined.
self.mates = WeakSet()
#: Holds the event as key, async function as listener in value set.
self._subscribes = dict()
#: Holds the Exception type as key, async function as handler in value.
self._except_hdlr = dict()
def listeners(self, event):
""" The event dispatcher. """
return self._subscribes.get(event, set())
def except_handler(self, exc, default=None):
""" The exception dispatcher. """
return self._except_hdlr.get(exc, default or self.noop)
async def noop(self, ob, exception):
""" The default function to handle exceptions. """
class Observer:
""" The observer object implements a event dispatcher binds with websocket.
Usually, it is living with another `Room` object is derived by `BaseRoom`
class.
.. admonition:: What if I need to use this class alone ?
The class is designed to be created in a `Room` object, if you really
need to use it alone, to remember register events in the `room` class
attribute.
:param ws: the websocket to binds with.
:param cfg: the configuration dict from web server application.
:param session: the session dict for this connection. An empty dict will be
created by default.
:param validator: An callable function with a dict-like input, and return a
tuple (event_name, event_data). Defaults to access dict
key "event" and "data".
"""
#: The default `room` object which depends on.
room = BaseRoom()
def __init__(self, ws, cfg, session=None, validator=None):
#: Holds the output messages.
self.tasks = asyncio.Queue()
#: Use as DI to let JSON dumps/loads replace-able.
self.dumps, self.loads = dumps, loads
self.ws, self.cfg = ws, cfg
self.session = session or dict()
self.validator = validator or _validator
async def trigger(self, event, data):
""" To trigger an event with the data received. All the exceptions that
uncatched will be propagated for the handler which registerd in room.
:param event: The event name as string.
:param data: The data as dict that be propogated.
"""
fns = self.room.listeners(event)
futs = [f(self, data) for f in fns]
for e in await asyncio.gather(*futs, return_exceptions=True):
if isinstance(e, BaseException):
hdlr = self.room.except_handler(type(e))
asyncio.ensure_future(hdlr(self, e))
async def emit(self, event, *args, **kws):
""" To publish result json to client with specific `event`. `args` will
be added into the key `args` in `kws`. For example::
await ob.emit("greeting", "hello", success=True)
Is equivalent to the following::
await ob.emit("greeting", args=["hello"], success=True)
And Client will received the data like::
{"event": "greeting", "data": {"args": ["hello"], "success": true}}
:param event: The event name as string.
:param args: The values that JSON serializable. added to "args" in `kws`.
:param kws: The key/value should be serializable in JSON.
"""
kws['args'] = kws.get('args', []).extend(args)
data = self.dumps(dict(event=event, data=kws))
await self.tasks.put(data)
async def serve(self):
""" To start communcating with client, this function will block until
websocket closed.
"""
tasks = [self._reader(), self._writer()]
_, d = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[_.cancel() for _ in d]
async def _reader(self):
ws = self.ws
while ws.open:
try:
data = await ws.recv()
msg = self.loads(data)
evt, data = self.validator(msg)
coro = self.trigger(evt, data)
asyncio.ensure_future(coro)
except Exception as e:
hdlr = self.room.except_handler(type(e))
asyncio.ensure_future(hdlr(self, e))
async def _writer(self):
ws = self.ws
while ws.open:
try:
data = await self.tasks.get()
asyncio.ensure_future(ws.send(data))
except Exception as e:
hdlr = self.room.except_handler(type(e))
asyncio.ensure_future(hdlr(self, e))
class Room(BaseRoom):
""" A minimal implements for BaseRoom class. Implemented methods to register
events and exceptions.
.. admonition:: Change the Default Observer Class
In default, we bind with :class:`Observer` to create observer object for
connect with websocket, you can re-assign another class (that should be
derived from :class:`Observer`) for :attr:`observer_class` to change the
default.
If you just want to add another kind of observer object to Room object,
consider to use :meth:`add_roommate` instead.
"""
#: Use Observer as the default Observer class.
observer_class = Observer
def on(self, event):
""" A decorator that is used to register an async function for a given
event name. The async function will receive observer object as the 1st
argument, and the data from client as 2nd argument. An event allowed to
register for multiple functions.
It does the same thing as :meth:`subscribe` but is intented for decorator
usage::
@room.on("greeting")
async def greeting(ob, data):
await ob.emit("greeting", "hello there")
:param event: The event name as string.
"""
def decorator(cb):
self.subscribe(event, cb)
return cb
return decorator
def on_exception(self, exception):
""" A decorator that is used to register an async function for a given
exception class. The async function will receive observer object as the
1st argument, and the data from client as 2nd argument. It does the
same thing as :meth:`register_exception` but is intented for decorator
usage::
@room.on_exception(IndexError)
async def index_error(ob, exc):
await ob.emit("Error", "An Exception catched.", repr(exc))
:param exception: The exception class to register.
"""
def decorator(cb):
self.register_exception(exception, cb)
return cb
return decorator
def make_observer(self, websocket, *args, **kws):
""" Binds a websocket to a observer object that is an instance of
:attr:`observer_class`.
:param websocket: The websocket need to be binds.
:param args, kws: position/keyword arguments that delegate for
:attr:`observer_class`.
:return: a new observer object created via :attr:`observer_class`.
"""
ob = self.observer_class(websocket, *args, **kws)
return ob
def add_roommate(self, observer):
""" Registers and binds a observer object to this room.
:param observer: An observer object or the same, has to be an instance of
:class:`Observer`.
"""
assert isinstance(observer, Observer)
observer.room = self
self.mates.add(observer)
def join(self, ws, *args, **kws):
""" Registers a connected websocket to this room, and return a observer
object for manipulate.
:param ws: The connected websocket.
:param args: Position arguments that delegate to observer class.
:param kws: Keyword-only arguments that delegate to observer class.
:return: a new observer object created via :attr:`observer_class`.
"""
ob = self.make_observer(ws, *args, **kws)
self.add_roommate(ob)
return ob
def subscribe(self, evt, cb):
""" Registers a event to invoke the async callback function. The async
function will receive the observer object as the 1st argument, and the
data from client as 2nd argument. An event allowed to register with
multiple functions.
Works exactly like the :meth:`on` decorator.
For example::
@room.on("greeting")
async def greeting(ob, data):
await ob.emit("greeting", "hello there")
Is equivalent to the following::
async def greeting(ob, data):
await ob.emit("greeting", "hello there")
room.subscribe("greeting", greeting)
:param evt: The event name as string.
:param cb: The async function as a callback that accept two arguments:
observer object and data.
"""
assert callable(cb)
fns = self._subscribes.get(evt, set())
fns.add(cb)
self._subscribes[evt] = fns
def register_exception(self, exc, cb):
""" Registers a exception class to invoke the async callback function.
The function will receive the observer object as the 1st argument, and
the data from client as 2nd argument.
Works exactly like the :meth:`on_exception` decorator.
For example::
@room.on_exception(IndexError)
async def index_error(ob, exc):
await ob.emit("Error", "An Exception catched.", repr(exc))
Is equivalent to the following::
async def index_error(ob, exc):
await ob.emit("Error", "An Exception catched.", repr(exc))
room.register_exception(IndexError, index_error)
:param exc: The exception class to register.
:param cb: The async function as a callback that accept two arguments:
observer object and data.
"""
assert callable(cb)
self._except_hdlr[exc] = cb
def broadcast(self, caller, evt, data):
""" Broadcast the event and data to all mates in this room except the
caller. This is not an async function but return a Future to be await.
:param caller: An observer object or the same, has to be an instance of
:class:`Observer`.
:param evt: The event name as string.
:param data: The data that propagate to functions.
:return: An pending Future from `asyncio.gather` for manipulate.
"""
assert isinstance(caller, Observer)
futs = [m.trigger(evt, data) for m in (self.mates - {caller})]
return asyncio.gather(*futs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment