- Введение
- Основы многопоточности во Flask
- Работа с SQLAlchemy в многопоточном режиме
- Примитивы синхронизации
- Демоны и фоновые задачи
- Обработка ошибок в потоках
- Многопроцессорность vs Многопоточность
- Асинхронный подход
- Рекомендации и лучшие практики
В этом руководстве мы рассмотрим различные аспекты многопоточного программирования с использованием Flask и SQLAlchemy. Мы изучим как базовые концепции, так и продвинутые техники, включая обработку deadlock'ов, создание демонов и правильную обработку ошибок.
from flask import Flask
from threading import Thread
import time
app = Flask(__name__)
# Глобальный счетчик для демонстрации
counter = 0
def background_task():
"""Фоновая задача, выполняющаяся в отдельном потоке"""
global counter
while True:
counter += 1
time.sleep(1)
@app.route('/')
def home():
return f'Текущее значение счетчика: {counter}'
if __name__ == '__main__':
# Запуск фоновой задачи в отдельном потоке
background_thread = Thread(target=background_task, daemon=True)
background_thread.start()
# Запуск Flask-приложения
app.run(debug=False, threaded=True)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import scoped_session, sessionmaker
from threading import Thread
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Создаем базу данных
db = SQLAlchemy(app)
# Создаем модель для примера
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# Создаем фабрику сессий для потокобезопасного использования
Session = scoped_session(sessionmaker(bind=db.engine))
def worker_thread():
"""Рабочий поток для работы с базой данных"""
# Получаем новую сессию для этого потока
session = Session()
try:
# Выполняем операции с базой данных
new_user = User(username=f'user_{time.time()}')
session.add(new_user)
session.commit()
except Exception as e:
session.rollback()
print(f'Ошибка: {e}')
finally:
# Важно всегда закрывать сессию
session.close()
# Удаляем сессию из реестра
Session.remove()
@app.route('/create_user')
def create_user():
thread = Thread(target=worker_thread)
thread.start()
return 'Создание пользователя запущено в фоновом режиме'
from threading import Lock
# Создаем блокировку
data_lock = Lock()
shared_data = []
def safe_append(item):
"""Потокобезопасное добавление элемента в список"""
with data_lock:
shared_data.append(item)
# Здесь можно безопасно работать с shared_data
from threading import Lock
import threading
import time
# Создаем две блокировки
lock1 = Lock()
lock2 = Lock()
def thread1_function():
"""Функция, которая может привести к deadlock"""
print("Thread 1: Пытаемся получить lock1")
with lock1:
print("Thread 1: lock1 получен")
time.sleep(0.5) # Имитируем работу
print("Thread 1: Пытаемся получить lock2")
with lock2:
print("Thread 1: lock2 получен")
def thread2_function():
"""Функция, которая может привести к deadlock"""
print("Thread 2: Пытаемся получить lock2")
with lock2:
print("Thread 2: lock2 получен")
time.sleep(0.5) # Имитируем работу
print("Thread 2: Пытаемся получить lock1")
with lock1:
print("Thread 2: lock1 получен")
# Правильное решение - использование RLock или упорядочивание блокировок
def safe_thread_function():
"""Безопасная версия функции без риска deadlock"""
# Всегда получаем блокировки в одном и том же порядке
with lock1:
with lock2:
# Выполняем работу
pass
import threading
import time
import logging
class WatchDog(threading.Thread):
"""Демон-наблюдатель за состоянием приложения"""
def __init__(self):
super().__init__()
self.daemon = True # Делаем поток демоном
self.stopped = threading.Event()
self.health_checks = {}
def run(self):
while not self.stopped.is_set():
# Проверяем все зарегистрированные сервисы
for service_name, last_check in self.health_checks.items():
if time.time() - last_check > 60: # Порог - 1 минута
logging.warning(f'Сервис {service_name} не отвечает')
time.sleep(5)
def register_health_check(self, service_name):
"""Регистрация нового сервиса для мониторинга"""
self.health_checks[service_name] = time.time()
def update_health_check(self, service_name):
"""Обновление времени последней проверки"""
self.health_checks[service_name] = time.time()
def stop(self):
"""Остановка демона"""
self.stopped.set()
# Использование:
watchdog = WatchDog()
watchdog.start()
import sys
import threading
import traceback
import logging
class ThreadWithExceptionHandling(threading.Thread):
"""Поток с обработкой исключений"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exception = None
def run(self):
try:
super().run()
except Exception as e:
self.exception = e
logging.error(f'Ошибка в потоке {self.name}: {e}')
logging.error(traceback.format_exc())
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
# Пример использования:
def potentially_dangerous_function():
"""Функция, которая может вызвать исключение"""
raise ValueError("Произошла ошибка!")
def main():
thread = ThreadWithExceptionHandling(target=potentially_dangerous_function)
thread.start()
try:
thread.join()
except Exception as e:
logging.error(f'Поток завершился с ошибкой: {e}')
from multiprocessing import Process, Queue
import time
def cpu_bound_task(queue):
"""Задача с интенсивными вычислениями"""
result = 0
for i in range(10**7):
result += i
queue.put(result)
def run_in_process():
"""Запуск задачи в отдельном процессе"""
queue = Queue()
process = Process(target=cpu_bound_task, args=(queue,))
process.start()
result = queue.get() # Получаем результат
process.join()
return result
def run_in_thread():
"""Запуск задачи в отдельном потоке"""
result = []
thread = Thread(target=lambda: result.append(cpu_bound_task(Queue())))
thread.start()
thread.join()
return result[0]
# Сравнение производительности:
def compare_performance():
start = time.time()
process_result = run_in_process()
process_time = time.time() - start
start = time.time()
thread_result = run_in_thread()
thread_time = time.time() - start
print(f'Время выполнения в процессе: {process_time:.2f} сек')
print(f'Время выполнения в потоке: {thread_time:.2f} сек')
from quart import Quart # Асинхронный аналог Flask
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio
app = Quart(__name__)
# Создаем асинхронный движок
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
async_session = sessionmaker(engine, class_=AsyncSession)
@app.route('/async')
async def async_route():
async with async_session() as session:
# Асинхронный запрос к базе данных
result = await session.execute("SELECT * FROM users")
users = result.fetchall()
return {'users': [dict(user) for user in users]}
# Сравнение с многопоточным подходом:
@app.route('/threaded')
def threaded_route():
def db_operation():
with Session() as session:
return session.execute("SELECT * FROM users").fetchall()
thread = Thread(target=db_operation)
thread.start()
thread.join()
return {'users': [dict(user) for user in users]}
-
Выбор подхода:
- Используйте многопоточность для I/O-bound задач
- Используйте многопроцессорность для CPU-bound задач
- Используйте асинхронное программирование для масштабируемых веб-приложений
-
Работа с базой данных:
- Всегда используйте scoped_session для потокобезопасности
- Закрывайте сессии после использования
- Используйте контекстные менеджеры (with)
-
Безопасность:
- Избегайте глобальных переменных
- Используйте примитивы синхронизации
- Правильно обрабатывайте исключения
-
Производительность:
- Не создавайте слишком много потоков
- Используйте пулы потоков
- Следите за утечками памяти
-
Отладка:
- Используйте логирование
- Добавляйте уникальные идентификаторы для потоков
- Мониторьте состояние приложения
Плюсы:
- Легкое разделение ресурсов
- Эффективно для I/O-операций
- Меньшее потребление памяти
Минусы:
- GIL в Python
- Сложность отладки
- Риск race conditions
Плюсы:
- Высокая производительность
- Эффективное использование ресурсов
- Лучшая масштабируемость
Минусы:
- Сложность написания кода
- Необходимость специальных библиотек
- Сложность интеграции с синхронным кодом
from threading import Lock
# Создаем блокировку
data_lock = Lock()
shared_data = []
def safe_append(item):
"""Потокобезопасное добавление элемента в список"""
with data_lock:
shared_data.append(item)
# Здесь можно безопасно работать с shared_data
from threading import Semaphore, Thread
import time
# Создаем семафор, ограничивающий доступ до 3 потоков
pool_semaphore = Semaphore(3)
def worker(worker_id):
"""Работник, использующий ограниченный ресурс"""
with pool_semaphore:
print(f'Работник {worker_id} получил доступ к ресурсу')
time.sleep(2) # Имитация работы
print(f'Работник {worker_id} освободил ресурс')
# Пример использования семафора
def run_workers():
threads = []
for i in range(5):
t = Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
from threading import Barrier, Thread
import time
# Создаем барьер для синхронизации 3 потоков
synchronization_barrier = Barrier(3)
def worker(worker_id, barrier):
"""Работник, который должен дождаться других перед продолжением"""
print(f'Работник {worker_id} начал подготовку')
time.sleep(worker_id) # Разное время подготовки
print(f'Работник {worker_id} готов и ждет других')
barrier.wait() # Ждем, пока все потоки достигнут этой точки
print(f'Работник {worker_id} продолжает работу после синхронизации')
# Пример использования барьера
def run_synchronized_workers():
threads = []
for i in range(3):
t = Thread(target=worker, args=(i, synchronization_barrier))
threads.append(t)
t.start()
for t in threads:
t.join()
from threading import Event, Thread
import time
# Создаем событие
start_event = Event()
def worker(worker_id, event):
"""Работник, ожидающий сигнала для начала работы"""
print(f'Работник {worker_id} готов и ждет сигнала')
event.wait() # Ждем установки события
print(f'Работник {worker_id} начал работу')
# Пример использования события
def coordinate_workers():
threads = []
for i in range(3):
t = Thread(target=worker, args=(i, start_event))
threads.append(t)
t.start()
print('Подготовка к работе...')
time.sleep(2)
print('Даем сигнал к началу работы')
start_event.set() # Сигнал всем потокам начать работу
for t in threads:
t.join()
from threading import Condition, Thread
import time
import queue
class BoundedQueue:
"""Очередь с ограниченным размером и условной синхронизацией"""
def __init__(self, size):
self.queue = queue.Queue(size)
self.condition = Condition()
self.max_size = size
def put(self, item):
with self.condition:
while self.queue.qsize() >= self.max_size:
print('Очередь полна, ожидаем освобождения места')
self.condition.wait()
self.queue.put(item)
print(f'Добавлен элемент: {item}')
self.condition.notify() # Уведомляем ожидающих потребителей
def get(self):
with self.condition:
while self.queue.empty():
print('Очередь пуста, ожидаем новых элементов')
self.condition.wait()
item = self.queue.get()
print(f'Получен элемент: {item}')
self.condition.notify() # Уведомляем ожидающих производителей
return item
# Пример использования условной синхронизации
def producer(queue):
"""Производитель элементов"""
for i in range(5):
time.sleep(1) # Имитация работы
queue.put(i)
def consumer(queue):
"""Потребитель элементов"""
for _ in range(5):
time.sleep(2) # Имитация работы
queue.get()
def run_producer_consumer():
bounded_queue = BoundedQueue(2) # Очередь размером 2
prod = Thread(target=producer, args=(bounded_queue,))
cons = Thread(target=consumer, args=(bounded_queue,))
prod.start()
cons.start()
prod.join()
cons.join()
from threading import RLock
class SafeCounter:
"""Потокобезопасный счетчик с возможностью повторного входа"""
def __init__(self):
self._lock = RLock() # Реентерабельная блокировка
self._count = 0
def increment(self):
with self._lock:
self._count += 1
self._update_statistics() # Метод, который также использует блокировку
def _update_statistics(self):
with self._lock: # Можно повторно войти в блокировку
# Обновление статистики
pass
@property
def value(self):
with self._lock:
return self._count
from threading import Thread, Lock, Semaphore, Event, Barrier
import time
import logging
class ResourcePool:
"""Пул ресурсов с комплексной синхронизацией"""
def __init__(self, resource_count):
self.resources = list(range(resource_count))
self.lock = Lock() # Для защиты списка ресурсов
self.semaphore = Semaphore(resource_count) # Для контроля количества одновременных доступов
self.ready_event = Event() # Для сигнализации о готовности пула
self.sync_barrier = Barrier(resource_count) # Для синхронизации использования ресурсов
def initialize(self):
"""Инициализация пула ресурсов"""
with self.lock:
print('Инициализация пула ресурсов...')
time.sleep(1) # Имитация подготовки
self.ready_event.set()
def get_resource(self):
"""Получение ресурса из пула"""
self.ready_event.wait() # Ждем инициализации
self.semaphore.acquire() # Запрашиваем доступ к ресурсу
with self.lock:
if not self.resources:
self.semaphore.release()
raise RuntimeError('Нет доступных ресурсов')
return self.resources.pop()
def release_resource(self, resource_id):
"""Возврат ресурса в пул"""
with self.lock:
self.resources.append(resource_id)
self.semaphore.release()
def worker(pool, worker_id):
"""Работник, использующий ресурс из пула"""
try:
resource = pool.get_resource()
print(f'Работник {worker_id} получил ресурс {resource}')
time.sleep(1) # Имитация работы
pool.release_resource(resource)
print(f'Работник {worker_id} освободил ресурс {resource}')
# Ждем, пока все закончат первую фазу
pool.sync_barrier.wait()
print(f'Работник {worker_id} прошел барьер')
except Exception as e:
logging.error(f'Ошибка в работнике {worker_id}: {e}')
# Пример использования комплексной синхронизации
def run_complex_example():
pool = ResourcePool(3)
# Запускаем инициализацию в отдельном потоке
init_thread = Thread(target=pool.initialize)
init_thread.start()
# Запускаем работников
workers = []
for i in range(3):
t = Thread(target=worker, args=(pool, i))
workers.append(t)
t.start()
# Ждем завершения всех потоков
init_thread.join()
for t in workers:
t.join()