-
-
Save pgrandinetti/964747a9f2464e576b8c6725da12c1eb to your computer and use it in GitHub Desktop.
| import socket | |
| import asyncio | |
| import websockets | |
| import time | |
| import logging | |
| import argparse | |
| import threading | |
| import sys | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig( | |
| stream=sys.stdout, | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| class WSClient(): | |
| def __init__(self, url, **kwargs): | |
| self.url = url | |
| # set some default values | |
| self.reply_timeout = kwargs.get('reply_timeout') or 10 | |
| self.ping_timeout = kwargs.get('ping_timeout') or 5 | |
| self.sleep_time = kwargs.get('sleep_time') or 5 | |
| self.callback = kwargs.get('callback') | |
| async def listen_forever(self): | |
| while True: | |
| # outer loop restarted every time the connection fails | |
| logger.debug('Creating new connection...') | |
| try: | |
| async with websockets.connect(self.url) as ws: | |
| while True: | |
| # listener loop | |
| try: | |
| reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout) | |
| except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): | |
| try: | |
| pong = await ws.ping() | |
| await asyncio.wait_for(pong, timeout=self.ping_timeout) | |
| logger.debug('Ping OK, keeping connection alive...') | |
| continue | |
| except: | |
| logger.debug( | |
| 'Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
| await asyncio.sleep(self.sleep_time) | |
| break | |
| logger.debug('Server said > {}'.format(reply)) | |
| if self.callback: | |
| self.callback(reply) | |
| except socket.gaierror: | |
| logger.debug( | |
| 'Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
| await asyncio.sleep(self.sleep_time) | |
| continue | |
| except ConnectionRefusedError: | |
| logger.debug('Nobody seems to listen to this endpoint. Please check the URL.') | |
| logger.debug('Retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time)) | |
| await asyncio.sleep(self.sleep_time) | |
| continue | |
| def start_ws_client(client): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(client.listen_forever()) | |
| def callback_fn(data, *args, **kwargs): | |
| # Write here your logic | |
| logger.debug('This is the callback speaking!') # ignore data | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--url', | |
| required=False, | |
| # set here your URL | |
| default='ws://katiedj.com:8080/macro/sample/', | |
| dest='url', | |
| help='Websocket URL') | |
| parser.add_argument('--reply-timeout', | |
| required=False, | |
| dest='reply_timeout', | |
| type=int, | |
| help='Timeout for reply from server') | |
| parser.add_argument('--ping-timeout', | |
| required=False, | |
| dest='ping_timeout', | |
| default=None, | |
| help='Timeout when pinging the server') | |
| parser.add_argument('--sleep', | |
| required=False, | |
| type=int, | |
| dest='sleep_time', | |
| default=None, | |
| help='Sleep time before retrieving connection') | |
| args = parser.parse_args() | |
| ws_client = WSClient(**vars(args), callback=callback_fn) | |
| start_ws_client(ws_client) |
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.
thank you for this example. this example has a callback when receive msg, but if i want to send data, how to get the ws client?
I had the same confusion and realized later.
For sending information, an additional http synchronization request is a better choice.
@silegon Can you suggest a way to make this clearer?
Thanks mate for this snippet!
It seems that the feature has been implemented in websockets using async for:
async for websocket in websockets.connect(...):
try:
...
except websockets.ConnectionClosed:
continueWould you say that you snippet is still useful compared to the official solution?
Thanks,
Cocco
@coccoinomane It appears that async for in websockets was implemented as a response to the following, very old ticket that I opened python-websockets/websockets#414 - and therefore I expect it do implement the same logic of this gist, or maybe a better one!
Short answer: Use async for as provided in websockets.
Great, thanks! Have a nice day :-)
@chenkaiC4 This is a listener (client), not a producer (server). You can use this code to listen to data that are streamed by some server.
In this example, I listen to the data streamed by the server at http://katiedj.com (which I built as well). The code to build a server like katiedj.com is open-source https://github.com/pgrandinetti/katiedj
Feel free to open an issue in that project, if you need help creating your own server.