Skip to content

Instantly share code, notes, and snippets.

@yoshimax
Created February 8, 2021 21:47
Show Gist options
  • Save yoshimax/688bf6849ca2a477adeb87fd36a2d45b to your computer and use it in GitHub Desktop.
Save yoshimax/688bf6849ca2a477adeb87fd36a2d45b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# WS server example that synchronizes state across clients
import asyncio
import json
import logging
import websockets
from asyncio_mqtt import Client, MqttError
logging.basicConfig()
logger = logging.getLogger(__name__)
STATE = {"value": 0}
USERS = set()
## --------------- MQTT
async def mqtt_wait():
while True:
try:
logger.info("Connecting to MQTT")
async with Client("192.168.0.101", username="emqx", password="xxxxx", client_id="Python3") as client:
# async with Client("192.168.0.101") as client:
logger.info("Connection to MQTT open")
async with client.unfiltered_messages() as messages:
await client.subscribe('m5cp2/fromDevice')
async for message in messages:
logger.info("Message %s %s", message.topic, message.payload.decode())
message = str(message.payload.decode())
await asyncio.wait([user.send(message) for user in USERS])
# await asyncio.sleep(2)
except MqttError as e:
logger.error("Connection to MQTT closed: " + str(e))
except Exception:
logger.exception("Connection to MQTT closed")
await asyncio.sleep(3)
## --------------- WebSocket
def state_event():
return json.dumps({"type": "state", **STATE})
def users_event():
return json.dumps({"type": "users", "count": len(USERS)})
async def notify_state():
if USERS: # asyncio.wait doesn't accept an empty list
message = state_event()
await asyncio.wait([user.send(message) for user in USERS])
async def notify_users():
if USERS: # asyncio.wait doesn't accept an empty list
message = users_event()
await asyncio.wait([user.send(message) for user in USERS])
async def register(websocket):
USERS.add(websocket)
await notify_users()
async def unregister(websocket):
USERS.remove(websocket)
await notify_users()
async def counter(websocket, path):
# register(websocket) sends user_event() to websocket
await register(websocket)
try:
await websocket.send(state_event())
async for message in websocket:
data = json.loads(message)
if data["action"] == "minus":
STATE["value"] -= 1
await notify_state()
elif data["action"] == "plus":
STATE["value"] += 1
await notify_state()
else:
logging.error("unsupported event: {}", data)
finally:
await unregister(websocket)
async def websocket_wait():
start_server = websockets.serve(counter, "localhost", 6789)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def main():
# logging.basicConfig(level=logging.DEBUG,
# format="%(asctime)s %(levelname)s %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S")
asyncio.run(asyncio.wait([mqtt_wait(), websocket_wait()]))
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment