Created
February 3, 2025 18:57
-
-
Save mtanco/82f5df3f6070a2f8e7c9b7f708e7d814 to your computer and use it in GitHub Desktop.
Python Multi-threading Examples: Demonstrates three common patterns - Basic Thread Synchronization, Queue-based Communication, and Modern Thread Pools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python Multi-threading Examples | |
----------------------------- | |
This example demonstrates three common patterns for multi-threading in Python: | |
1. Basic thread synchronization using Lock | |
2. Thread communication using Queue | |
3. Modern thread pools using concurrent.futures | |
Key concepts demonstrated: | |
- Thread creation and management | |
- Thread synchronization with Lock | |
- Thread communication with Queue | |
- Thread pools with concurrent.futures | |
- Real-world example of concurrent web requests | |
Author: H2O.ai | |
License: Apache 2.0 | |
""" | |
import threading | |
import queue | |
import time | |
import concurrent.futures | |
import logging | |
from typing import List | |
import requests | |
from dataclasses import dataclass | |
from datetime import datetime | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(threadName)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Shared counter with lock for thread synchronization example | |
counter = 0 | |
counter_lock = threading.Lock() | |
# Queue for thread communication example | |
task_queue = queue.Queue() | |
@dataclass | |
class WebsiteStatus: | |
"""Data class to store website check results""" | |
url: str | |
status_code: int | |
response_time: float | |
timestamp: datetime | |
def increment_counter(thread_id: int, increments: int): | |
""" | |
Example of using thread locks for synchronization. | |
This demonstrates how to safely modify shared state (counter) | |
across multiple threads using a Lock. | |
Args: | |
thread_id: Identifier for the thread | |
increments: Number of times to increment the counter | |
""" | |
global counter | |
for _ in range(increments): | |
with counter_lock: # Thread-safe counter increment | |
current = counter | |
time.sleep(0.1) # Simulate some work | |
counter = current + 1 | |
logger.info(f"Thread {thread_id} incremented counter to {counter}") | |
def process_queue_tasks(): | |
""" | |
Example of using queue for thread communication. | |
This demonstrates how to use Queue for safely passing tasks | |
between threads, with proper task completion signaling. | |
""" | |
while True: | |
try: | |
# Get task with timeout to allow thread termination | |
task = task_queue.get(timeout=1) | |
logger.info(f"Processing task: {task}") | |
time.sleep(0.5) # Simulate processing | |
task_queue.task_done() | |
except queue.Empty: | |
logger.info("No more tasks, exiting") | |
break | |
def check_website(url: str) -> WebsiteStatus: | |
""" | |
Check website status - example for thread pool. | |
This demonstrates a real-world use case for thread pools, | |
making concurrent HTTP requests. | |
Args: | |
url: Website URL to check | |
Returns: | |
WebsiteStatus object with response details | |
""" | |
try: | |
start_time = time.time() | |
response = requests.get(url, timeout=5) | |
response_time = time.time() - start_time | |
return WebsiteStatus( | |
url=url, | |
status_code=response.status_code, | |
response_time=response_time, | |
timestamp=datetime.now() | |
) | |
except Exception as e: | |
logger.error(f"Error checking {url}: {str(e)}") | |
return WebsiteStatus( | |
url=url, | |
status_code=-1, | |
response_time=-1, | |
timestamp=datetime.now() | |
) | |
def main(): | |
logger.info("Starting threading examples...") | |
# 1. Basic thread creation example | |
logger.info("\n1. Basic Threading Example:") | |
threads = [] | |
for i in range(3): | |
thread = threading.Thread( | |
target=increment_counter, | |
args=(i, 3), | |
name=f"Counter-{i}" | |
) | |
threads.append(thread) | |
thread.start() | |
# Wait for counter threads to finish | |
for thread in threads: | |
thread.join() | |
logger.info(f"Final counter value: {counter}") | |
# 2. Queue communication example | |
logger.info("\n2. Queue Communication Example:") | |
# Create worker threads | |
workers = [] | |
for i in range(2): | |
worker = threading.Thread( | |
target=process_queue_tasks, | |
name=f"Worker-{i}" | |
) | |
workers.append(worker) | |
worker.start() | |
# Add tasks to queue | |
for i in range(5): | |
task_queue.put(f"Task-{i}") | |
# Wait for all tasks to be processed | |
task_queue.join() | |
# Wait for workers to finish | |
for worker in workers: | |
worker.join() | |
# 3. Thread pool example with website checker | |
logger.info("\n3. Thread Pool Example:") | |
websites = [ | |
"http://example.com", | |
"http://google.com", | |
"http://github.com" | |
] | |
# Using thread pool executor | |
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
# Submit all tasks and get futures | |
future_to_url = { | |
executor.submit(check_website, url): url | |
for url in websites | |
} | |
# Process results as they complete | |
for future in concurrent.futures.as_completed(future_to_url): | |
url = future_to_url[future] | |
try: | |
status = future.result() | |
logger.info( | |
f"Website {url}: " | |
f"Status={status.status_code}, " | |
f"Time={status.response_time:.2f}s" | |
) | |
except Exception as e: | |
logger.error(f"Error processing {url}: {str(e)}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment