Skip to content

Instantly share code, notes, and snippets.

@Miron-Anosov
Created January 29, 2025 12:25
Show Gist options
  • Save Miron-Anosov/b7786f7e26adb3dc8181396a498fa812 to your computer and use it in GitHub Desktop.
Save Miron-Anosov/b7786f7e26adb3dc8181396a498fa812 to your computer and use it in GitHub Desktop.

Руководство по многопоточности во Flask и SQLAlchemy

Содержание

  1. Введение
  2. Основы многопоточности во Flask
  3. Работа с SQLAlchemy в многопоточном режиме
  4. Примитивы синхронизации
  5. Демоны и фоновые задачи
  6. Обработка ошибок в потоках
  7. Многопроцессорность vs Многопоточность
  8. Асинхронный подход
  9. Рекомендации и лучшие практики

Введение

В этом руководстве мы рассмотрим различные аспекты многопоточного программирования с использованием Flask и SQLAlchemy. Мы изучим как базовые концепции, так и продвинутые техники, включая обработку deadlock'ов, создание демонов и правильную обработку ошибок.

Основы многопоточности во Flask

Простой пример многопоточного веб-сервера

from flask import Flask
from threading import Thread
import time

app = Flask(__name__)

# Глобальный счетчик для демонстрации
counter = 0

def background_task():
    """Фоновая задача, выполняющаяся в отдельном потоке"""
    global counter
    while True:
        counter += 1
        time.sleep(1)

@app.route('/')
def home():
    return f'Текущее значение счетчика: {counter}'

if __name__ == '__main__':
    # Запуск фоновой задачи в отдельном потоке
    background_thread = Thread(target=background_task, daemon=True)
    background_thread.start()
    
    # Запуск Flask-приложения
    app.run(debug=False, threaded=True)

Работа с SQLAlchemy в многопоточном режиме

Правильная настройка сессий

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import scoped_session, sessionmaker
from threading import Thread

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# Создаем базу данных
db = SQLAlchemy(app)

# Создаем модель для примера
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)

# Создаем фабрику сессий для потокобезопасного использования
Session = scoped_session(sessionmaker(bind=db.engine))

def worker_thread():
    """Рабочий поток для работы с базой данных"""
    # Получаем новую сессию для этого потока
    session = Session()
    try:
        # Выполняем операции с базой данных
        new_user = User(username=f'user_{time.time()}')
        session.add(new_user)
        session.commit()
    except Exception as e:
        session.rollback()
        print(f'Ошибка: {e}')
    finally:
        # Важно всегда закрывать сессию
        session.close()
        # Удаляем сессию из реестра
        Session.remove()

@app.route('/create_user')
def create_user():
    thread = Thread(target=worker_thread)
    thread.start()
    return 'Создание пользователя запущено в фоновом режиме'

Примитивы синхронизации

Пример использования Lock

from threading import Lock

# Создаем блокировку
data_lock = Lock()
shared_data = []

def safe_append(item):
    """Потокобезопасное добавление элемента в список"""
    with data_lock:
        shared_data.append(item)
        # Здесь можно безопасно работать с shared_data

Пример deadlock

from threading import Lock
import threading
import time

# Создаем две блокировки
lock1 = Lock()
lock2 = Lock()

def thread1_function():
    """Функция, которая может привести к deadlock"""
    print("Thread 1: Пытаемся получить lock1")
    with lock1:
        print("Thread 1: lock1 получен")
        time.sleep(0.5)  # Имитируем работу
        print("Thread 1: Пытаемся получить lock2")
        with lock2:
            print("Thread 1: lock2 получен")

def thread2_function():
    """Функция, которая может привести к deadlock"""
    print("Thread 2: Пытаемся получить lock2")
    with lock2:
        print("Thread 2: lock2 получен")
        time.sleep(0.5)  # Имитируем работу
        print("Thread 2: Пытаемся получить lock1")
        with lock1:
            print("Thread 2: lock1 получен")

# Правильное решение - использование RLock или упорядочивание блокировок
def safe_thread_function():
    """Безопасная версия функции без риска deadlock"""
    # Всегда получаем блокировки в одном и том же порядке
    with lock1:
        with lock2:
            # Выполняем работу
            pass

Демоны и фоновые задачи

Создание демона-наблюдателя

import threading
import time
import logging

class WatchDog(threading.Thread):
    """Демон-наблюдатель за состоянием приложения"""
    
    def __init__(self):
        super().__init__()
        self.daemon = True  # Делаем поток демоном
        self.stopped = threading.Event()
        self.health_checks = {}
        
    def run(self):
        while not self.stopped.is_set():
            # Проверяем все зарегистрированные сервисы
            for service_name, last_check in self.health_checks.items():
                if time.time() - last_check > 60:  # Порог - 1 минута
                    logging.warning(f'Сервис {service_name} не отвечает')
            time.sleep(5)
    
    def register_health_check(self, service_name):
        """Регистрация нового сервиса для мониторинга"""
        self.health_checks[service_name] = time.time()
    
    def update_health_check(self, service_name):
        """Обновление времени последней проверки"""
        self.health_checks[service_name] = time.time()
    
    def stop(self):
        """Остановка демона"""
        self.stopped.set()

# Использование:
watchdog = WatchDog()
watchdog.start()

Обработка ошибок в потоках

Перехват и логирование ошибок

import sys
import threading
import traceback
import logging

class ThreadWithExceptionHandling(threading.Thread):
    """Поток с обработкой исключений"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exception = None
    
    def run(self):
        try:
            super().run()
        except Exception as e:
            self.exception = e
            logging.error(f'Ошибка в потоке {self.name}: {e}')
            logging.error(traceback.format_exc())
    
    def join(self, timeout=None):
        super().join(timeout)
        if self.exception:
            raise self.exception

# Пример использования:
def potentially_dangerous_function():
    """Функция, которая может вызвать исключение"""
    raise ValueError("Произошла ошибка!")

def main():
    thread = ThreadWithExceptionHandling(target=potentially_dangerous_function)
    thread.start()
    try:
        thread.join()
    except Exception as e:
        logging.error(f'Поток завершился с ошибкой: {e}')

Многопроцессорность vs Многопоточность

Пример использования multiprocessing

from multiprocessing import Process, Queue
import time

def cpu_bound_task(queue):
    """Задача с интенсивными вычислениями"""
    result = 0
    for i in range(10**7):
        result += i
    queue.put(result)

def run_in_process():
    """Запуск задачи в отдельном процессе"""
    queue = Queue()
    process = Process(target=cpu_bound_task, args=(queue,))
    process.start()
    result = queue.get()  # Получаем результат
    process.join()
    return result

def run_in_thread():
    """Запуск задачи в отдельном потоке"""
    result = []
    thread = Thread(target=lambda: result.append(cpu_bound_task(Queue())))
    thread.start()
    thread.join()
    return result[0]

# Сравнение производительности:
def compare_performance():
    start = time.time()
    process_result = run_in_process()
    process_time = time.time() - start
    
    start = time.time()
    thread_result = run_in_thread()
    thread_time = time.time() - start
    
    print(f'Время выполнения в процессе: {process_time:.2f} сек')
    print(f'Время выполнения в потоке: {thread_time:.2f} сек')

Асинхронный подход

Пример асинхронного Flask-приложения

from quart import Quart  # Асинхронный аналог Flask
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio

app = Quart(__name__)

# Создаем асинхронный движок
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
async_session = sessionmaker(engine, class_=AsyncSession)

@app.route('/async')
async def async_route():
    async with async_session() as session:
        # Асинхронный запрос к базе данных
        result = await session.execute("SELECT * FROM users")
        users = result.fetchall()
        return {'users': [dict(user) for user in users]}

# Сравнение с многопоточным подходом:
@app.route('/threaded')
def threaded_route():
    def db_operation():
        with Session() as session:
            return session.execute("SELECT * FROM users").fetchall()
    
    thread = Thread(target=db_operation)
    thread.start()
    thread.join()
    return {'users': [dict(user) for user in users]}

Рекомендации и лучшие практики

  1. Выбор подхода:

    • Используйте многопоточность для I/O-bound задач
    • Используйте многопроцессорность для CPU-bound задач
    • Используйте асинхронное программирование для масштабируемых веб-приложений
  2. Работа с базой данных:

    • Всегда используйте scoped_session для потокобезопасности
    • Закрывайте сессии после использования
    • Используйте контекстные менеджеры (with)
  3. Безопасность:

    • Избегайте глобальных переменных
    • Используйте примитивы синхронизации
    • Правильно обрабатывайте исключения
  4. Производительность:

    • Не создавайте слишком много потоков
    • Используйте пулы потоков
    • Следите за утечками памяти
  5. Отладка:

    • Используйте логирование
    • Добавляйте уникальные идентификаторы для потоков
    • Мониторьте состояние приложения

Минусы и плюсы разных подходов

Многопоточность

Плюсы:

  • Легкое разделение ресурсов
  • Эффективно для I/O-операций
  • Меньшее потребление памяти

Минусы:

  • GIL в Python
  • Сложность отладки
  • Риск race conditions

Асинхронность

Плюсы:

  • Высокая производительность
  • Эффективное использование ресурсов
  • Лучшая масштабируемость

Минусы:

  • Сложность написания кода
  • Необходимость специальных библиотек
  • Сложность интеграции с синхронным кодом

Примитивы синхронизации

Lock (Блокировка)

from threading import Lock

# Создаем блокировку
data_lock = Lock()
shared_data = []

def safe_append(item):
    """Потокобезопасное добавление элемента в список"""
    with data_lock:
        shared_data.append(item)
        # Здесь можно безопасно работать с shared_data

Semaphore (Семафор)

from threading import Semaphore, Thread
import time

# Создаем семафор, ограничивающий доступ до 3 потоков
pool_semaphore = Semaphore(3)

def worker(worker_id):
    """Работник, использующий ограниченный ресурс"""
    with pool_semaphore:
        print(f'Работник {worker_id} получил доступ к ресурсу')
        time.sleep(2)  # Имитация работы
        print(f'Работник {worker_id} освободил ресурс')

# Пример использования семафора
def run_workers():
    threads = []
    for i in range(5):
        t = Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()

Barrier (Барьер)

from threading import Barrier, Thread
import time

# Создаем барьер для синхронизации 3 потоков
synchronization_barrier = Barrier(3)

def worker(worker_id, barrier):
    """Работник, который должен дождаться других перед продолжением"""
    print(f'Работник {worker_id} начал подготовку')
    time.sleep(worker_id)  # Разное время подготовки
    
    print(f'Работник {worker_id} готов и ждет других')
    barrier.wait()  # Ждем, пока все потоки достигнут этой точки
    
    print(f'Работник {worker_id} продолжает работу после синхронизации')

# Пример использования барьера
def run_synchronized_workers():
    threads = []
    for i in range(3):
        t = Thread(target=worker, args=(i, synchronization_barrier))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()

Event (Событие)

from threading import Event, Thread
import time

# Создаем событие
start_event = Event()

def worker(worker_id, event):
    """Работник, ожидающий сигнала для начала работы"""
    print(f'Работник {worker_id} готов и ждет сигнала')
    event.wait()  # Ждем установки события
    print(f'Работник {worker_id} начал работу')

# Пример использования события
def coordinate_workers():
    threads = []
    for i in range(3):
        t = Thread(target=worker, args=(i, start_event))
        threads.append(t)
        t.start()
    
    print('Подготовка к работе...')
    time.sleep(2)
    print('Даем сигнал к началу работы')
    start_event.set()  # Сигнал всем потокам начать работу
    
    for t in threads:
        t.join()

Condition (Условие)

from threading import Condition, Thread
import time
import queue

class BoundedQueue:
    """Очередь с ограниченным размером и условной синхронизацией"""
    
    def __init__(self, size):
        self.queue = queue.Queue(size)
        self.condition = Condition()
        self.max_size = size
    
    def put(self, item):
        with self.condition:
            while self.queue.qsize() >= self.max_size:
                print('Очередь полна, ожидаем освобождения места')
                self.condition.wait()
            
            self.queue.put(item)
            print(f'Добавлен элемент: {item}')
            self.condition.notify()  # Уведомляем ожидающих потребителей
    
    def get(self):
        with self.condition:
            while self.queue.empty():
                print('Очередь пуста, ожидаем новых элементов')
                self.condition.wait()
            
            item = self.queue.get()
            print(f'Получен элемент: {item}')
            self.condition.notify()  # Уведомляем ожидающих производителей
            return item

# Пример использования условной синхронизации
def producer(queue):
    """Производитель элементов"""
    for i in range(5):
        time.sleep(1)  # Имитация работы
        queue.put(i)

def consumer(queue):
    """Потребитель элементов"""
    for _ in range(5):
        time.sleep(2)  # Имитация работы
        queue.get()

def run_producer_consumer():
    bounded_queue = BoundedQueue(2)  # Очередь размером 2
    
    prod = Thread(target=producer, args=(bounded_queue,))
    cons = Thread(target=consumer, args=(bounded_queue,))
    
    prod.start()
    cons.start()
    
    prod.join()
    cons.join()

RLock (Реентерабельная блокировка)

from threading import RLock

class SafeCounter:
    """Потокобезопасный счетчик с возможностью повторного входа"""
    
    def __init__(self):
        self._lock = RLock()  # Реентерабельная блокировка
        self._count = 0
    
    def increment(self):
        with self._lock:
            self._count += 1
            self._update_statistics()  # Метод, который также использует блокировку
    
    def _update_statistics(self):
        with self._lock:  # Можно повторно войти в блокировку
            # Обновление статистики
            pass
    
    @property
    def value(self):
        with self._lock:
            return self._count

Практический пример использования разных примитивов

from threading import Thread, Lock, Semaphore, Event, Barrier
import time
import logging

class ResourcePool:
    """Пул ресурсов с комплексной синхронизацией"""
    
    def __init__(self, resource_count):
        self.resources = list(range(resource_count))
        self.lock = Lock()  # Для защиты списка ресурсов
        self.semaphore = Semaphore(resource_count)  # Для контроля количества одновременных доступов
        self.ready_event = Event()  # Для сигнализации о готовности пула
        self.sync_barrier = Barrier(resource_count)  # Для синхронизации использования ресурсов
        
    def initialize(self):
        """Инициализация пула ресурсов"""
        with self.lock:
            print('Инициализация пула ресурсов...')
            time.sleep(1)  # Имитация подготовки
            self.ready_event.set()
    
    def get_resource(self):
        """Получение ресурса из пула"""
        self.ready_event.wait()  # Ждем инициализации
        self.semaphore.acquire()  # Запрашиваем доступ к ресурсу
        
        with self.lock:
            if not self.resources:
                self.semaphore.release()
                raise RuntimeError('Нет доступных ресурсов')
            return self.resources.pop()
    
    def release_resource(self, resource_id):
        """Возврат ресурса в пул"""
        with self.lock:
            self.resources.append(resource_id)
        self.semaphore.release()

def worker(pool, worker_id):
    """Работник, использующий ресурс из пула"""
    try:
        resource = pool.get_resource()
        print(f'Работник {worker_id} получил ресурс {resource}')
        time.sleep(1)  # Имитация работы
        pool.release_resource(resource)
        print(f'Работник {worker_id} освободил ресурс {resource}')
        
        # Ждем, пока все закончат первую фазу
        pool.sync_barrier.wait()
        print(f'Работник {worker_id} прошел барьер')
        
    except Exception as e:
        logging.error(f'Ошибка в работнике {worker_id}: {e}')

# Пример использования комплексной синхронизации
def run_complex_example():
    pool = ResourcePool(3)
    
    # Запускаем инициализацию в отдельном потоке
    init_thread = Thread(target=pool.initialize)
    init_thread.start()
    
    # Запускаем работников
    workers = []
    for i in range(3):
        t = Thread(target=worker, args=(pool, i))
        workers.append(t)
        t.start()
    
    # Ждем завершения всех потоков
    init_thread.join()
    for t in workers:
        t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment