Created
November 15, 2024 18:59
-
-
Save WomB0ComB0/264877ca1c817108ade93ef6b4f38f14 to your computer and use it in GitHub Desktop.
Python multi-threading template (will apply opinionated changes soon)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import queue | |
import concurrent.futures | |
import logging | |
from typing import List, Callable, Any | |
from dataclasses import dataclass | |
from datetime import datetime | |
import time | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' | |
) | |
@dataclass | |
class Task: | |
"""Represents a task to be processed by worker threads.""" | |
id: int | |
data: Any | |
created_at: datetime = datetime.now() | |
class ThreadPool: | |
"""A thread pool implementation with work queue and result handling.""" | |
def __init__(self, num_threads: int = 4, queue_size: int = 100): | |
self.queue = queue.Queue(maxsize=queue_size) | |
self.results = queue.Queue() | |
self.lock = threading.Lock() | |
self.event = threading.Event() | |
self.num_threads = num_threads | |
self.workers: List[threading.Thread] = [] | |
self._initialize_workers() | |
def _initialize_workers(self): | |
"""Initialize worker threads.""" | |
for i in range(self.num_threads): | |
worker = threading.Thread( | |
target=self._worker_thread, | |
name=f"Worker-{i}", | |
daemon=True | |
) | |
self.workers.append(worker) | |
worker.start() | |
def _worker_thread(self): | |
"""Worker thread function that processes tasks from the queue.""" | |
while not self.event.is_set(): | |
try: | |
task = self.queue.get(timeout=1.0) # 1 second timeout | |
try: | |
result = self._process_task(task) | |
with self.lock: | |
self.results.put((task.id, result, None)) # (task_id, result, error) | |
except Exception as e: | |
logging.error(f"Error processing task {task.id}: {str(e)}") | |
with self.lock: | |
self.results.put((task.id, None, str(e))) | |
finally: | |
self.queue.task_done() | |
except queue.Empty: | |
continue | |
except Exception as e: | |
logging.error(f"Worker thread error: {str(e)}") | |
def _process_task(self, task: Task) -> Any: | |
""" | |
Process a single task. Override this method in subclass. | |
""" | |
# Example processing | |
time.sleep(0.1) # Simulate work | |
return f"Processed task {task.id} with data {task.data}" | |
def submit_task(self, task: Task) -> None: | |
"""Submit a task to the thread pool.""" | |
self.queue.put(task) | |
def shutdown(self, wait: bool = True): | |
"""Shutdown the thread pool.""" | |
self.event.set() | |
if wait: | |
for worker in self.workers: | |
worker.join() | |
def get_results(self) -> List[tuple]: | |
"""Get all available results.""" | |
results = [] | |
while not self.results.empty(): | |
results.append(self.results.get()) | |
return results | |
# Example usage | |
def example_usage(): | |
# Create a custom thread pool | |
pool = ThreadPool(num_threads=4) | |
try: | |
# Submit some tasks | |
for i in range(10): | |
task = Task(id=i, data=f"Sample data {i}") | |
pool.submit_task(task) | |
# Wait for all tasks to complete | |
pool.queue.join() | |
# Get and print results | |
results = pool.get_results() | |
for task_id, result, error in results: | |
if error: | |
logging.error(f"Task {task_id} failed: {error}") | |
else: | |
logging.info(f"Task {task_id} result: {result}") | |
finally: | |
# Shutdown the pool | |
pool.shutdown() | |
# Alternative example using concurrent.futures | |
def concurrent_futures_example(): | |
def worker_function(x: int) -> int: | |
time.sleep(0.1) # Simulate work | |
return x * x | |
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
futures = [executor.submit(worker_function, i) for i in range(10)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
result = future.result() | |
logging.info(f"Got result: {result}") | |
except Exception as e: | |
logging.error(f"Task failed: {str(e)}") | |
if __name__ == "__main__": | |
# Run the custom thread pool example | |
example_usage() | |
# Run the concurrent.futures example | |
concurrent_futures_example() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment