Skip to content

Instantly share code, notes, and snippets.

@WomB0ComB0
Created November 15, 2024 18:59
Show Gist options
  • Save WomB0ComB0/264877ca1c817108ade93ef6b4f38f14 to your computer and use it in GitHub Desktop.
Save WomB0ComB0/264877ca1c817108ade93ef6b4f38f14 to your computer and use it in GitHub Desktop.
Python multi-threading template (will apply opinionated changes soon)
import threading
import queue
import concurrent.futures
import logging
from typing import List, Callable, Any
from dataclasses import dataclass
from datetime import datetime
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
)
@dataclass
class Task:
"""Represents a task to be processed by worker threads."""
id: int
data: Any
created_at: datetime = datetime.now()
class ThreadPool:
"""A thread pool implementation with work queue and result handling."""
def __init__(self, num_threads: int = 4, queue_size: int = 100):
self.queue = queue.Queue(maxsize=queue_size)
self.results = queue.Queue()
self.lock = threading.Lock()
self.event = threading.Event()
self.num_threads = num_threads
self.workers: List[threading.Thread] = []
self._initialize_workers()
def _initialize_workers(self):
"""Initialize worker threads."""
for i in range(self.num_threads):
worker = threading.Thread(
target=self._worker_thread,
name=f"Worker-{i}",
daemon=True
)
self.workers.append(worker)
worker.start()
def _worker_thread(self):
"""Worker thread function that processes tasks from the queue."""
while not self.event.is_set():
try:
task = self.queue.get(timeout=1.0) # 1 second timeout
try:
result = self._process_task(task)
with self.lock:
self.results.put((task.id, result, None)) # (task_id, result, error)
except Exception as e:
logging.error(f"Error processing task {task.id}: {str(e)}")
with self.lock:
self.results.put((task.id, None, str(e)))
finally:
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Worker thread error: {str(e)}")
def _process_task(self, task: Task) -> Any:
"""
Process a single task. Override this method in subclass.
"""
# Example processing
time.sleep(0.1) # Simulate work
return f"Processed task {task.id} with data {task.data}"
def submit_task(self, task: Task) -> None:
"""Submit a task to the thread pool."""
self.queue.put(task)
def shutdown(self, wait: bool = True):
"""Shutdown the thread pool."""
self.event.set()
if wait:
for worker in self.workers:
worker.join()
def get_results(self) -> List[tuple]:
"""Get all available results."""
results = []
while not self.results.empty():
results.append(self.results.get())
return results
# Example usage
def example_usage():
# Create a custom thread pool
pool = ThreadPool(num_threads=4)
try:
# Submit some tasks
for i in range(10):
task = Task(id=i, data=f"Sample data {i}")
pool.submit_task(task)
# Wait for all tasks to complete
pool.queue.join()
# Get and print results
results = pool.get_results()
for task_id, result, error in results:
if error:
logging.error(f"Task {task_id} failed: {error}")
else:
logging.info(f"Task {task_id} result: {result}")
finally:
# Shutdown the pool
pool.shutdown()
# Alternative example using concurrent.futures
def concurrent_futures_example():
def worker_function(x: int) -> int:
time.sleep(0.1) # Simulate work
return x * x
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(worker_function, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
logging.info(f"Got result: {result}")
except Exception as e:
logging.error(f"Task failed: {str(e)}")
if __name__ == "__main__":
# Run the custom thread pool example
example_usage()
# Run the concurrent.futures example
concurrent_futures_example()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment