Skip to content

Instantly share code, notes, and snippets.

@mvandermeulen
Forked from Miron-Anosov/websocket.md
Created June 27, 2025 17:35
Show Gist options
  • Save mvandermeulen/bd804baad04d75eaa80fc5148b77f5e9 to your computer and use it in GitHub Desktop.
Save mvandermeulen/bd804baad04d75eaa80fc5148b77f5e9 to your computer and use it in GitHub Desktop.

WebSocket Guide: Theory and Implementation with FastAPI

Table of Contents

Theoretical Background

WebSocket - это протокол связи, обеспечивающий полнодуплексный канал связи через единое TCP-соединение. Основные характеристики:

  • Постоянное соединение между клиентом и сервером
  • Двунаправленная передача данных
  • Низкая задержка
  • Поддержка текстовых и бинарных данных

Процесс установки соединения:

  1. Клиент отправляет HTTP запрос с заголовком Upgrade: websocket
  2. Сервер отвечает с кодом 101 (Switching Protocols)
  3. TCP соединение остается открытым для WebSocket коммуникации

WebSocket vs HTTP

Характеристика WebSocket HTTP
Соединение Постоянное Временное
Направление Двунаправленное Запрос-ответ
Overhead Минимальный после установки Каждый запрос имеет заголовки
Use Cases Чаты, real-time данные, игры CRUD операции, API

FastAPI WebSocket Implementation

Базовый пример:

from fastapi import FastAPI, WebSocket
from typing import List

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    async def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Client #{client_id}: {data}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        await manager.disconnect(websocket)

Расширенный пример с аутентификацией и валидацией:

from fastapi import FastAPI, WebSocket, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import jwt
from typing import Optional

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class WebSocketManager:
    def __init__(self):
        self.connections = {}
        self.user_connections = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self.connections[websocket] = user_id
        self.user_connections.setdefault(user_id, set()).add(websocket)

    async def disconnect(self, websocket: WebSocket):
        user_id = self.connections.pop(websocket, None)
        if user_id:
            self.user_connections[user_id].remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str, exclude: Optional[WebSocket] = None):
        for connection in self.connections:
            if connection != exclude:
                await connection.send_text(message)

manager = WebSocketManager()

async def get_current_user(token: str):
    try:
        payload = jwt.decode(token, "secret_key", algorithms=["HS256"])
        return payload.get("sub")
    except:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication token",
        )

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    client_id: int,
    token: str = Depends(oauth2_scheme)
):
    user = await get_current_user(token)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await manager.connect(websocket, user)
    try:
        while True:
            data = await websocket.receive_text()
            # Валидация входящих данных
            if len(data) > 1000:  # Пример простой валидации
                await websocket.send_text("Message too long")
                continue
            
            # Обработка команд
            if data.startswith("/"):
                await handle_command(data, websocket, user)
                continue

            # Broadcast сообщения
            await manager.broadcast(
                f"User {user} says: {data}",
                exclude=websocket
            )
    except Exception as e:
        print(f"Error handling websocket: {e}")
    finally:
        await manager.disconnect(websocket)

async def handle_command(cmd: str, websocket: WebSocket, user: str):
    """Обработка специальных команд"""
    if cmd == "/users":
        active_users = list(manager.user_connections.keys())
        await manager.send_personal_message(
            f"Active users: {active_users}",
            websocket
        )

Best Practices

  1. Управление соединениями:
    • Используйте менеджер соединений
    • Правильно закрывайте соединения
    • Имплементируйте механизм переподключения
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket, client_id: str):
        async with self.lock:
            await websocket.accept()
            self.active_connections[client_id] = websocket

    async def disconnect(self, client_id: str):
        async with self.lock:
            if client_id in self.active_connections:
                await self.active_connections[client_id].close()
                del self.active_connections[client_id]
  1. Обработка ошибок:
    • Используйте try/except/finally
    • Правильно закрывайте соединения при ошибках
    • Логируйте ошибки
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    try:
        await manager.connect(websocket)
        while True:
            try:
                data = await websocket.receive_text()
                # обработка данных
            except WebSocketDisconnect:
                logger.info("Client disconnected normally")
                break
            except Exception as e:
                logger.error(f"Error handling message: {e}")
                await websocket.close(code=1011)
                break
    finally:
        await manager.disconnect(websocket)
  1. Heartbeat и Keep-alive:
async def heartbeat(websocket: WebSocket):
    while True:
        try:
            await websocket.send_text("ping")
            await asyncio.sleep(30)
        except Exception:
            break

Common Patterns

Pub/Sub Pattern:

class PubSubManager:
    def __init__(self):
        self.subscriptions: Dict[str, Set[WebSocket]] = defaultdict(set)

    async def subscribe(self, topic: str, websocket: WebSocket):
        self.subscriptions[topic].add(websocket)

    async def unsubscribe(self, topic: str, websocket: WebSocket):
        self.subscriptions[topic].remove(websocket)

    async def publish(self, topic: str, message: str):
        for websocket in self.subscriptions[topic]:
            await websocket.send_text(message)

Rate Limiting:

from fastapi import WebSocket
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, messages_per_second: int = 5):
        self.rate_limit = messages_per_second
        self.message_timestamps: Dict[WebSocket, List[datetime]] = defaultdict(list)

    def is_rate_limited(self, websocket: WebSocket) -> bool:
        now = datetime.now()
        timestamps = self.message_timestamps[websocket]
        
        # Удаляем старые timestamp'ы
        timestamps = [ts for ts in timestamps if now - ts < timedelta(seconds=1)]
        self.message_timestamps[websocket] = timestamps

        if len(timestamps) >= self.rate_limit:
            return True

        self.message_timestamps[websocket].append(now)
        return False

Security Considerations

  1. Аутентификация:
async def get_token_from_cookie(websocket: WebSocket):
    cookie = websocket.cookies.get("session")
    if not cookie:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return cookie

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    token: str = Depends(get_token_from_cookie)
):
    user = await authenticate_user(token)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    # ...
  1. Валидация данных:
from pydantic import BaseModel, validator

class WSMessage(BaseModel):
    type: str
    content: str
    
    @validator('content')
    def content_must_be_safe(cls, v):
        if '<script>' in v.lower():
            raise ValueError('XSS attempt detected')
        return v

async def validate_message(websocket: WebSocket):
    data = await websocket.receive_text()
    try:
        message = WSMessage.parse_raw(data)
        return message
    except ValidationError:
        await websocket.send_text("Invalid message format")
        return None

Testing

Unit Tests:

from fastapi.testclient import TestClient
from fastapi.websockets import WebSocket
import pytest

def test_websocket_connection():
    client = TestClient(app)
    with client.websocket_connect("/ws") as websocket:
        data = websocket.receive_text()
        assert data == "Connected"

@pytest.mark.asyncio
async def test_connection_manager():
    manager = ConnectionManager()
    websocket = WebSocket()
    
    await manager.connect(websocket)
    assert len(manager.active_connections) == 1
    
    await manager.disconnect(websocket)
    assert len(manager.active_connections) == 0

Integration Tests:

import asyncio
import websockets

async def test_websocket_chat():
    uri = "ws://localhost:8000/ws"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello")
        response = await websocket.recv()
        assert "Hello" in response

async def test_multiple_clients():
    uri = "ws://localhost:8000/ws"
    async with websockets.connect(uri) as ws1, \
              websockets.connect(uri) as ws2:
        await ws1.send("Hi from client 1")
        msg = await ws2.recv()
        assert "client 1" in msg

Дополнительные советы

  1. Масштабирование:

    • Используйте Redis для хранения состояния
    • Применяйте горизонтальное масштабирование
    • Используйте брокеры сообщений для коммуникации между узлами
  2. Мониторинг:

    • Отслеживайте количество активных соединений
    • Мониторьте память и CPU
    • Логируйте ошибки и важные события
  3. Оптимизация:

    • Используйте сжатие данных
    • Минимизируйте размер сообщений
    • Используйте пулы соединений
  4. Документация:

    • Документируйте протокол
    • Опишите форматы сообщений
    • Предоставьте примеры использования
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment