Created
February 8, 2021 21:47
-
-
Save yoshimax/688bf6849ca2a477adeb87fd36a2d45b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# WS server example that synchronizes state across clients | |
import asyncio | |
import json | |
import logging | |
import websockets | |
from asyncio_mqtt import Client, MqttError | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
STATE = {"value": 0} | |
USERS = set() | |
## --------------- MQTT | |
async def mqtt_wait(): | |
while True: | |
try: | |
logger.info("Connecting to MQTT") | |
async with Client("192.168.0.101", username="emqx", password="xxxxx", client_id="Python3") as client: | |
# async with Client("192.168.0.101") as client: | |
logger.info("Connection to MQTT open") | |
async with client.unfiltered_messages() as messages: | |
await client.subscribe('m5cp2/fromDevice') | |
async for message in messages: | |
logger.info("Message %s %s", message.topic, message.payload.decode()) | |
message = str(message.payload.decode()) | |
await asyncio.wait([user.send(message) for user in USERS]) | |
# await asyncio.sleep(2) | |
except MqttError as e: | |
logger.error("Connection to MQTT closed: " + str(e)) | |
except Exception: | |
logger.exception("Connection to MQTT closed") | |
await asyncio.sleep(3) | |
## --------------- WebSocket | |
def state_event(): | |
return json.dumps({"type": "state", **STATE}) | |
def users_event(): | |
return json.dumps({"type": "users", "count": len(USERS)}) | |
async def notify_state(): | |
if USERS: # asyncio.wait doesn't accept an empty list | |
message = state_event() | |
await asyncio.wait([user.send(message) for user in USERS]) | |
async def notify_users(): | |
if USERS: # asyncio.wait doesn't accept an empty list | |
message = users_event() | |
await asyncio.wait([user.send(message) for user in USERS]) | |
async def register(websocket): | |
USERS.add(websocket) | |
await notify_users() | |
async def unregister(websocket): | |
USERS.remove(websocket) | |
await notify_users() | |
async def counter(websocket, path): | |
# register(websocket) sends user_event() to websocket | |
await register(websocket) | |
try: | |
await websocket.send(state_event()) | |
async for message in websocket: | |
data = json.loads(message) | |
if data["action"] == "minus": | |
STATE["value"] -= 1 | |
await notify_state() | |
elif data["action"] == "plus": | |
STATE["value"] += 1 | |
await notify_state() | |
else: | |
logging.error("unsupported event: {}", data) | |
finally: | |
await unregister(websocket) | |
async def websocket_wait(): | |
start_server = websockets.serve(counter, "localhost", 6789) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() | |
def main(): | |
# logging.basicConfig(level=logging.DEBUG, | |
# format="%(asctime)s %(levelname)s %(message)s", | |
# datefmt="%Y-%m-%d %H:%M:%S") | |
asyncio.run(asyncio.wait([mqtt_wait(), websocket_wait()])) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment