Created
January 19, 2017 23:14
-
-
Save FirefighterBlu3/d3fd3f51fcc8d7ba9c24cbe06d22cabf to your computer and use it in GitHub Desktop.
free threaded WAMP client that runs separate from any main process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__version__ = '1.5' | |
__released__ = '2016-Jul-6 12:50A EST' | |
__author__ = '[email protected]' | |
__license__ = 'Apache 2.0' | |
import asyncio | |
import configparser | |
import sys | |
import time | |
import txaio | |
import signal | |
import socket | |
import logging | |
import threading | |
import functools | |
import types | |
import datetime | |
import traceback | |
import warnings | |
def warn_with_traceback(message, category, filename, lineno, file=None, line=None): | |
traceback.print_stack() | |
log = file if hasattr(file,'write') else sys.stderr | |
log.write(warnings.formatwarning(message, category, filename, lineno, line)) | |
warnings.showwarning = warn_with_traceback | |
# 3rd party aio library for asyncio friendly Queue | |
# https://github.com/aio-libs/janus.git | |
import janus | |
from autobahn.websocket.util import parse_url | |
from autobahn.asyncio.websocket import WampWebSocketClientFactory | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.wamp.types import (ComponentConfig, SessionDetails, EventDetails, CallDetails, PublishOptions, CallOptions, | |
CloseDetails, Challenge, SubscribeOptions, RegisterOptions) | |
from concurrent.futures import CancelledError | |
from concurrent.futures import ProcessPoolExecutor | |
class _Component(ApplicationSession): | |
close_reason = None | |
_shutdown = False | |
_loop = None | |
def __init__(self, realm:str, extra:dict, loop, join_future:asyncio.Future): | |
super().__init__(ComponentConfig(realm, extra)) | |
self.__join_future = join_future | |
self.event_loop = loop | |
self.log = logging.getLogger() | |
def onConnect(self): | |
self.close_reason = None | |
self.cfg = self.config.extra['cfg'] | |
realm = self.cfg.get('wamp', 'realm') | |
authid = self.cfg.get('wamp', 'cli username') | |
self.log.debug("\x1b[1;34mClientSession connected: Joining realm <{}> under authid <{}>\x1b[0m".format(realm, authid)) | |
self.join(realm, ['ticket'], authid) | |
def onChallenge(self, challenge): | |
self.log.debug("ClientSession received: {}".format(challenge)) | |
if challenge.method == 'ticket': | |
return self.cfg.get('wamp','cli password') | |
else: | |
raise Exception("Invalid authmethod {}".format(challenge.method)) | |
@asyncio.coroutine | |
def onJoin(self, details:SessionDetails): | |
self.log.debug('onJoin') | |
self.__join_future.set_result(details) | |
self.__join_future = None | |
def _publish_msg(self, channel, message): | |
try: | |
if not self.is_attached(): | |
self.log.warning('lost session with router') | |
else: | |
self.publish(channel, message) | |
except Exception as e: | |
self.log.critical(traceback.format_exc()) | |
def onLeave(self, details): | |
# also, autobahn-python's .leave() method doesn't | |
# propagate the reason or log_message, #todo, file bug | |
super().onLeave(details) | |
self.close_reason = details.reason | |
if not self.close_reason in ('wamp.close.logout','wamp.close.normal'): | |
print('unexpected communication loss from router:',self.close_reason) | |
self.event_loop.stop() | |
def onDisconnect(self, *args, **kwargs): | |
super().onDisconnect() | |
class WampClient(): | |
# we won't use the built in ApplicationRunner as we need to be able | |
# to intersperse within the loop | |
def __init__(self, id=None): | |
self.log = logging.getLogger() | |
self.extra={} | |
self.extra['cfg'] = configparser.ConfigParser() | |
self.extra['cfg'].read('/var/bluelabs/etc/hive.conf') | |
self.session = None | |
self.session_details = None | |
self.wamp_eventloop = None | |
self.starttime = datetime.datetime.now(tz=datetime.timezone.utc) | |
self.wamp_established = False | |
self.last_publish_ts = None | |
self._shutdown = False | |
if not id: | |
self.id = socket.getfqdn() | |
exit_ = False | |
for k in ('__version__','irl','realm','cli username','cli password','join timeout'): | |
if not k in self.extra['cfg']['wamp']: | |
exit_ = True | |
print("section [wamp]; required config option '{}' not found".format(k)) | |
if exit_: | |
raise KeyError('missing required config values') | |
self.realm = self.extra['cfg'].get('wamp','realm') | |
self.irl = self.extra['cfg'].get('wamp','irl') | |
self.client_version = self.extra['cfg'].get('wamp','__version__') | |
self.join_timeout = int(self.extra['cfg'].get('wamp','join timeout')) | |
self.loop = asyncio.get_event_loop() | |
self.q = janus.Queue(loop=self.loop) | |
tA = self.loop.run_in_executor(None, self.log_pusher, self.q.sync_q) | |
tB = self.loop.run_in_executor(None, self.app) | |
def log_pusher(self, queue): | |
''' this runs in its own thread so the main process runs freely as does | |
the WAMP client (freely: within the context of python threading) | |
''' | |
waits = 0 | |
while True: | |
while not self.session: | |
self.log.debug('not connected to WAMP router') | |
time.sleep(.25) | |
timeout = waits < 30 and .1 or 10 | |
while True: | |
try: | |
c,w,p,m = queue.get(timeout=timeout) | |
args=c,{'v':self.client_version, 'ts':w.strftime('%F %T +0000'),'priority':p,'msg':m} | |
self.session._publish_msg(*args) | |
self.last_publish_ts = datetime.datetime.now(tz=datetime.timezone.utc) | |
queue.task_done() | |
waits=0 | |
except janus.SyncQueueEmpty: | |
waits += 1 | |
break | |
except Exception as e: | |
log.critical('queue error: {}'.format(e)) | |
break | |
if self._shutdown: | |
break | |
def publish(self, message=None, channel=None, priority=None): | |
if message: | |
if not channel: | |
channel = 'hive.system.debug' | |
if not priority: | |
priority = 'info' | |
ts = datetime.datetime.now(tz=datetime.timezone.utc) | |
self.loop.run_until_complete(self.q.async_q.put((channel,ts,priority,message))) | |
def app(self) -> None: | |
""" | |
We use our own ApplicationRunner here which is almost an identical copy of | |
wamp.ApplicationRunner. The difference being that we need to: | |
a) explicitly get a new asyncio event loop because we aren't running | |
in the main thread - we'll get a RuntimeError: There is no current | |
event loop in thread <thread name>, and | |
b) don't set a signal handler for SIGTERM, also because we're not | |
running in the main thread | |
""" | |
#txaio.start_logging(level='debug') | |
isSecure, host, port, resource, path, params = parse_url(self.irl) | |
ssl = True | |
serializers = None | |
loop = txaio.config.loop = asyncio.new_event_loop() | |
self.wamp_eventloop = loop | |
asyncio.set_event_loop(loop) | |
async def fuck(loop): | |
while True: | |
self.log.debug('Connecting to router ') | |
join_future = asyncio.Future() | |
session_factory = functools.partial(_Component, self.realm, self.extra, loop, join_future) | |
transport_factory = WampWebSocketClientFactory( | |
session_factory, url=self.irl, serializers=serializers, loop=loop) | |
transport, protocol = await loop.create_connection( | |
transport_factory, host, port, ssl=ssl) | |
try: | |
# Connection established; wait for onJoin to finish | |
self.session_details = await asyncio.wait_for(join_future, timeout=3.0, loop=loop) | |
self.session = protocol._session | |
break | |
except (asyncio.TimeoutError,): | |
self.log.warning('router connection timeout') | |
# absorb the concurrent.futures._base.CancelledError error | |
try: | |
await asyncio.wait([join_future]) | |
except Exception as e: | |
self.log.critical('unexpected error while connecting to router: {}'.format(e)) | |
transport.close() | |
continue | |
except CancelledError: | |
try: | |
await asyncio.wait([join_future]) | |
except Exception as e: | |
self.log.critical('unexpected error while connecting to router: {}'.format(e)) | |
break | |
except Exception as e: | |
self.log.critical(traceback.format_exc()) | |
transport.close() | |
break | |
while True: | |
self.session = None | |
tasks = [ asyncio.ensure_future(fuck(loop)), ] | |
try: | |
loop.run_until_complete(asyncio.wait(tasks)) | |
self.wamp_established = True | |
except CancelledError: | |
break | |
except Exception as e: | |
self.log.critical('unexpected error while connecting to router: {} {}**'.format(e.__class__, e)) | |
break | |
try: | |
loop.run_forever() | |
if self.session.close_reason: | |
self.log.critical('session close reason: {}'.format(self.session.close_reason)) | |
except Exception as e: | |
self.log.critical(traceback.format_exc()) | |
if self.session.close_reason == 'wamp.close.transport_lost': | |
continue | |
break | |
# cleanup | |
try: | |
loop.run_until_complete(asyncio.wait(tasks)) | |
except: | |
pass | |
if self.wamp_eventloop.is_running(): | |
self.wamp_eventloop.stop() | |
self.wamp_eventloop.close() | |
def shutdown(self): | |
now = datetime.datetime.now(tz=datetime.timezone.utc) | |
whence = self.starttime + datetime.timedelta(seconds=self.join_timeout) | |
self._shutdown = True | |
if not self.q.async_q.empty(): | |
if not self.wamp_established and now < whence: | |
self.log.debug('waiting for publish queue to be sent') | |
# wait up to 10 seconds for client to connect | |
while datetime.datetime.now(tz=datetime.timezone.utc) < whence: | |
time.sleep(.5) | |
if self.wamp_established: | |
self.log.debug('session joined, publishing') | |
# wait a few more seconds | |
while not self.q.async_q.empty(): | |
time.sleep(.5) | |
break | |
if not self.wamp_established: | |
self.log.warning('WAMP session still not established, giving up') | |
else: | |
self.log.warning('publish queue unable to be sent within timeout, losing messages') | |
# pause for at least one second to let the provider get our message(s) | |
# and query the router about our session details before we leave and | |
# the session details vanish (bug? crossbar should cache them) | |
now = datetime.datetime.now(tz=datetime.timezone.utc) | |
if self.last_publish_ts and now - self.last_publish_ts < datetime.timedelta(seconds=1): | |
time.sleep(1) | |
if self.session: | |
self.session.leave('wamp.close.logout', 'logged out') | |
tasks = asyncio.Task.all_tasks(loop=self.wamp_eventloop) | |
for t in tasks: | |
if not t.done(): | |
self.wamp_eventloop.call_soon_threadsafe(t.cancel) | |
# how do we a) test if messages are still in queue or b) understand that | |
# we do have messages in queue but can't send them? or c) we ran so fast | |
# that we haven't even connected to the router yet? | |
self.wamp_eventloop.stop() | |
if __name__ == '__main__': | |
logging.basicConfig() | |
#logging.captureWarnings(True) | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
log.debug('\x1b[1;37m'+'='*80+'\x1b[0m') | |
try: | |
wc = WampClient() | |
for n in range(10): | |
wc.publish('test message with lots of foo diddly doo hum etc so we can make a really long long long long line that is intended to wrap. oohhhh, almost. just a few more words ought to do it! {}'.format(n)) | |
except KeyboardInterrupt: | |
print() | |
except Exception as e: | |
log.critical(traceback.format_exc()) | |
finally: | |
log.debug('shutting down client') | |
try: | |
wc.shutdown() | |
except Exception as e: | |
log.critical(traceback.format_exc()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment