Skip to content

Instantly share code, notes, and snippets.

@davebshow
Last active April 1, 2016 17:25
Show Gist options
  • Save davebshow/7913b06ee028e637cac27c6465a06afb to your computer and use it in GitHub Desktop.
Save davebshow/7913b06ee028e637cac27c6465a06afb to your computer and use it in GitHub Desktop.
import asyncio
import pulsar
from pulsar.apps import ws, wsgi
class Echo(ws.WS):
def on_bytes(self, websocket, msg):
websocket._loop.call_soon(
pulsar.ensure_future, self._call(websocket, msg))
@asyncio.coroutine
def _call(self, websocket, msg):
app = yield from pulsar.get_application('wsgi')
websocket.write(msg)
websocket.write_close()
class Site(wsgi.LazyWsgi):
def setup(self, environ):
return wsgi.WsgiHandler(
[ws.WebSocket('/', Echo())])
def server(**kwargs):
return wsgi.WSGIServer(callable=Site(), **kwargs)
if __name__ == '__main__': # pragma nocover
server().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment