-
-
Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
import socket | |
import asyncio | |
import websockets | |
import time | |
import logging | |
import argparse | |
import threading | |
import sys | |
logger = logging.getLogger(__name__) | |
logging.basicConfig( | |
stream=sys.stdout, | |
level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
class WSClient(): | |
def __init__(self, url, **kwargs): | |
self.url = url | |
# set some default values | |
self.reply_timeout = kwargs.get('reply_timeout') or 10 | |
self.ping_timeout = kwargs.get('ping_timeout') or 5 | |
self.sleep_time = kwargs.get('sleep_time') or 5 | |
self.callback = kwargs.get('callback') | |
async def listen_forever(self): | |
while True: | |
# outer loop restarted every time the connection fails | |
logger.debug('Creating new connection...') | |
try: | |
async with websockets.connect(self.url) as ws: | |
while True: | |
# listener loop | |
try: | |
reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout) | |
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): | |
try: | |
pong = await ws.ping() | |
await asyncio.wait_for(pong, timeout=self.ping_timeout) | |
logger.debug('Ping OK, keeping connection alive...') | |
continue | |
except: | |
logger.debug( | |
'Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
await asyncio.sleep(self.sleep_time) | |
break | |
logger.debug('Server said > {}'.format(reply)) | |
if self.callback: | |
self.callback(reply) | |
except socket.gaierror: | |
logger.debug( | |
'Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
await asyncio.sleep(self.sleep_time) | |
continue | |
except ConnectionRefusedError: | |
logger.debug('Nobody seems to listen to this endpoint. Please check the URL.') | |
logger.debug('Retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
await asyncio.sleep(self.sleep_time) | |
continue | |
def start_ws_client(client): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(client.listen_forever()) | |
def callback_fn(data, *args, **kwargs): | |
# Write here your logic | |
logger.debug('This is the callback speaking!') # ignore data | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--url', | |
required=False, | |
# set here your URL | |
default='ws://katiedj.com:8080/macro/sample/', | |
dest='url', | |
help='Websocket URL') | |
parser.add_argument('--reply-timeout', | |
required=False, | |
dest='reply_timeout', | |
type=int, | |
help='Timeout for reply from server') | |
parser.add_argument('--ping-timeout', | |
required=False, | |
dest='ping_timeout', | |
default=None, | |
help='Timeout when pinging the server') | |
parser.add_argument('--sleep', | |
required=False, | |
type=int, | |
dest='sleep_time', | |
default=None, | |
help='Sleep time before retrieving connection') | |
args = parser.parse_args() | |
ws_client = WSClient(**vars(args), callback=callback_fn) | |
start_ws_client(ws_client) |
Thank you for the example - quick question not related to reconnect but to the callback. Should this not be done using the call_soon, call_soon_threadsafe or run_in_executer rather than directly calling it?
@mjcumming
Good point. To do that, you have to make some change because call_soon
is a function of the event loop object.
I (very) quickly tested this approach:
- Create a method in the WSClient object
def set_loop(self, loop): self.loop=loop;
- Call that method between lines 69-70:
client.set_loop(loop)
- Replace line 54 with
self.loop.call_soon(self.callback, reply)
It seems to work fine.
That said, I believe whether this is a good approach or not depends on your application. Here's a quote from a SO answer:
Callbacks are also not expected to return anything; they are fire-and-forget routines, trusted to not lock up the whole system by running anything heavy or blocking. call_soon() returns a Handle() instance that only lets you cancel it again (a no-op if it already has been executed). The callbacks are executed next time the event loop checks the callback queue, at which point they (hopefully briefly) block any other work from being done*.
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
@chenkaiC4 This is a listener (client), not a producer (server). You can use this code to listen to data that are streamed by some server.
In this example, I listen to the data streamed by the server at http://katiedj.com (which I built as well). The code to build a server like katiedj.com is open-source https://github.com/pgrandinetti/katiedj
Feel free to open an issue in that project, if you need help creating your own server.
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.
@silegon Can you suggest a way to make this clearer?
Thanks mate for this snippet!
It seems that the feature has been implemented in websockets
using async for
:
async for websocket in websockets.connect(...):
try:
...
except websockets.ConnectionClosed:
continue
Would you say that you snippet is still useful compared to the official solution?
Thanks,
Cocco
@coccoinomane It appears that async for
in websockets
was implemented as a response to the following, very old ticket that I opened python-websockets/websockets#414 - and therefore I expect it do implement the same logic of this gist, or maybe a better one!
Short answer: Use async for
as provided in websockets
.
Great, thanks! Have a nice day :-)
Based on my project https://github.com/pgrandinetti/katiedj-listeners and on several requests received after my comment here python-websockets/websockets#414
Run it with
python automatic_websocket_reconnect.py
Tested with Python 3.7
Output of pip freeze