- Theoretical Background
- WebSocket vs HTTP
- FastAPI WebSocket Implementation
- Best Practices
- Common Patterns
- Error Handling
- Testing
- Security Considerations
WebSocket - это протокол связи, обеспечивающий полнодуплексный канал связи через единое TCP-соединение. Основные характеристики:
- Постоянное соединение между клиентом и сервером
- Двунаправленная передача данных
- Низкая задержка
- Поддержка текстовых и бинарных данных
- Клиент отправляет HTTP запрос с заголовком
Upgrade: websocket
- Сервер отвечает с кодом 101 (Switching Protocols)
- TCP соединение остается открытым для WebSocket коммуникации
Характеристика | WebSocket | HTTP |
---|---|---|
Соединение | Постоянное | Временное |
Направление | Двунаправленное | Запрос-ответ |
Overhead | Минимальный после установки | Каждый запрос имеет заголовки |
Use Cases | Чаты, real-time данные, игры | CRUD операции, API |
from fastapi import FastAPI, WebSocket
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
async def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client #{client_id}: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
await manager.disconnect(websocket)
from fastapi import FastAPI, WebSocket, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import jwt
from typing import Optional
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class WebSocketManager:
def __init__(self):
self.connections = {}
self.user_connections = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.connections[websocket] = user_id
self.user_connections.setdefault(user_id, set()).add(websocket)
async def disconnect(self, websocket: WebSocket):
user_id = self.connections.pop(websocket, None)
if user_id:
self.user_connections[user_id].remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str, exclude: Optional[WebSocket] = None):
for connection in self.connections:
if connection != exclude:
await connection.send_text(message)
manager = WebSocketManager()
async def get_current_user(token: str):
try:
payload = jwt.decode(token, "secret_key", algorithms=["HS256"])
return payload.get("sub")
except:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(
websocket: WebSocket,
client_id: int,
token: str = Depends(oauth2_scheme)
):
user = await get_current_user(token)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, user)
try:
while True:
data = await websocket.receive_text()
# Валидация входящих данных
if len(data) > 1000: # Пример простой валидации
await websocket.send_text("Message too long")
continue
# Обработка команд
if data.startswith("/"):
await handle_command(data, websocket, user)
continue
# Broadcast сообщения
await manager.broadcast(
f"User {user} says: {data}",
exclude=websocket
)
except Exception as e:
print(f"Error handling websocket: {e}")
finally:
await manager.disconnect(websocket)
async def handle_command(cmd: str, websocket: WebSocket, user: str):
"""Обработка специальных команд"""
if cmd == "/users":
active_users = list(manager.user_connections.keys())
await manager.send_personal_message(
f"Active users: {active_users}",
websocket
)
- Управление соединениями:
- Используйте менеджер соединений
- Правильно закрывайте соединения
- Имплементируйте механизм переподключения
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, client_id: str):
async with self.lock:
await websocket.accept()
self.active_connections[client_id] = websocket
async def disconnect(self, client_id: str):
async with self.lock:
if client_id in self.active_connections:
await self.active_connections[client_id].close()
del self.active_connections[client_id]
- Обработка ошибок:
- Используйте try/except/finally
- Правильно закрывайте соединения при ошибках
- Логируйте ошибки
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
try:
await manager.connect(websocket)
while True:
try:
data = await websocket.receive_text()
# обработка данных
except WebSocketDisconnect:
logger.info("Client disconnected normally")
break
except Exception as e:
logger.error(f"Error handling message: {e}")
await websocket.close(code=1011)
break
finally:
await manager.disconnect(websocket)
- Heartbeat и Keep-alive:
async def heartbeat(websocket: WebSocket):
while True:
try:
await websocket.send_text("ping")
await asyncio.sleep(30)
except Exception:
break
class PubSubManager:
def __init__(self):
self.subscriptions: Dict[str, Set[WebSocket]] = defaultdict(set)
async def subscribe(self, topic: str, websocket: WebSocket):
self.subscriptions[topic].add(websocket)
async def unsubscribe(self, topic: str, websocket: WebSocket):
self.subscriptions[topic].remove(websocket)
async def publish(self, topic: str, message: str):
for websocket in self.subscriptions[topic]:
await websocket.send_text(message)
from fastapi import WebSocket
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, messages_per_second: int = 5):
self.rate_limit = messages_per_second
self.message_timestamps: Dict[WebSocket, List[datetime]] = defaultdict(list)
def is_rate_limited(self, websocket: WebSocket) -> bool:
now = datetime.now()
timestamps = self.message_timestamps[websocket]
# Удаляем старые timestamp'ы
timestamps = [ts for ts in timestamps if now - ts < timedelta(seconds=1)]
self.message_timestamps[websocket] = timestamps
if len(timestamps) >= self.rate_limit:
return True
self.message_timestamps[websocket].append(now)
return False
- Аутентификация:
async def get_token_from_cookie(websocket: WebSocket):
cookie = websocket.cookies.get("session")
if not cookie:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return cookie
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Depends(get_token_from_cookie)
):
user = await authenticate_user(token)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
# ...
- Валидация данных:
from pydantic import BaseModel, validator
class WSMessage(BaseModel):
type: str
content: str
@validator('content')
def content_must_be_safe(cls, v):
if '<script>' in v.lower():
raise ValueError('XSS attempt detected')
return v
async def validate_message(websocket: WebSocket):
data = await websocket.receive_text()
try:
message = WSMessage.parse_raw(data)
return message
except ValidationError:
await websocket.send_text("Invalid message format")
return None
from fastapi.testclient import TestClient
from fastapi.websockets import WebSocket
import pytest
def test_websocket_connection():
client = TestClient(app)
with client.websocket_connect("/ws") as websocket:
data = websocket.receive_text()
assert data == "Connected"
@pytest.mark.asyncio
async def test_connection_manager():
manager = ConnectionManager()
websocket = WebSocket()
await manager.connect(websocket)
assert len(manager.active_connections) == 1
await manager.disconnect(websocket)
assert len(manager.active_connections) == 0
import asyncio
import websockets
async def test_websocket_chat():
uri = "ws://localhost:8000/ws"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello")
response = await websocket.recv()
assert "Hello" in response
async def test_multiple_clients():
uri = "ws://localhost:8000/ws"
async with websockets.connect(uri) as ws1, \
websockets.connect(uri) as ws2:
await ws1.send("Hi from client 1")
msg = await ws2.recv()
assert "client 1" in msg
-
Масштабирование:
- Используйте Redis для хранения состояния
- Применяйте горизонтальное масштабирование
- Используйте брокеры сообщений для коммуникации между узлами
-
Мониторинг:
- Отслеживайте количество активных соединений
- Мониторьте память и CPU
- Логируйте ошибки и важные события
-
Оптимизация:
- Используйте сжатие данных
- Минимизируйте размер сообщений
- Используйте пулы соединений
-
Документация:
- Документируйте протокол
- Опишите форматы сообщений
- Предоставьте примеры использования