Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save misterunknown/78bd3c580111fa45a55257aec0a981e0 to your computer and use it in GitHub Desktop.
Save misterunknown/78bd3c580111fa45a55257aec0a981e0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import datetime
import random
import websockets
USERS = set()
async def register(websocket):
USERS.add(websocket)
await broadcast_message("New User connected")
async def unregister(websocket):
USERS.remove(websocket)
await broadcast_message("A User disconnected")
async def broadcast_message(message, origin_websocket=None):
if USERS:
tag = "user"+str(id(origin_websocket)) if origin_websocket else "SYSTEM"
tag = tag.ljust(15)
if origin_websocket and len(USERS) < 2:
return
await asyncio.wait([user.send("["+tag+"] "+message) for user in USERS if user is not origin_websocket])
async def chatroom(websocket, path):
await register(websocket)
print("NEW USER CONNECTED. Now: "+str(len(USERS)))
try:
async for message in websocket:
await broadcast_message(message, websocket)
finally:
await unregister(websocket)
print("USER DISCONNECTED. Now: "+str(len(USERS)))
try:
start_server = websockets.serve(chatroom, "0.0.0.0", "5000")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print("Stopped")
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment