Skip to content

Instantly share code, notes, and snippets.

@mtanco
Created February 3, 2025 18:57
Show Gist options
  • Save mtanco/82f5df3f6070a2f8e7c9b7f708e7d814 to your computer and use it in GitHub Desktop.
Save mtanco/82f5df3f6070a2f8e7c9b7f708e7d814 to your computer and use it in GitHub Desktop.
Python Multi-threading Examples: Demonstrates three common patterns - Basic Thread Synchronization, Queue-based Communication, and Modern Thread Pools
"""
Python Multi-threading Examples
-----------------------------
This example demonstrates three common patterns for multi-threading in Python:
1. Basic thread synchronization using Lock
2. Thread communication using Queue
3. Modern thread pools using concurrent.futures
Key concepts demonstrated:
- Thread creation and management
- Thread synchronization with Lock
- Thread communication with Queue
- Thread pools with concurrent.futures
- Real-world example of concurrent web requests
Author: H2O.ai
License: Apache 2.0
"""
import threading
import queue
import time
import concurrent.futures
import logging
from typing import List
import requests
from dataclasses import dataclass
from datetime import datetime
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Shared counter with lock for thread synchronization example
counter = 0
counter_lock = threading.Lock()
# Queue for thread communication example
task_queue = queue.Queue()
@dataclass
class WebsiteStatus:
"""Data class to store website check results"""
url: str
status_code: int
response_time: float
timestamp: datetime
def increment_counter(thread_id: int, increments: int):
"""
Example of using thread locks for synchronization.
This demonstrates how to safely modify shared state (counter)
across multiple threads using a Lock.
Args:
thread_id: Identifier for the thread
increments: Number of times to increment the counter
"""
global counter
for _ in range(increments):
with counter_lock: # Thread-safe counter increment
current = counter
time.sleep(0.1) # Simulate some work
counter = current + 1
logger.info(f"Thread {thread_id} incremented counter to {counter}")
def process_queue_tasks():
"""
Example of using queue for thread communication.
This demonstrates how to use Queue for safely passing tasks
between threads, with proper task completion signaling.
"""
while True:
try:
# Get task with timeout to allow thread termination
task = task_queue.get(timeout=1)
logger.info(f"Processing task: {task}")
time.sleep(0.5) # Simulate processing
task_queue.task_done()
except queue.Empty:
logger.info("No more tasks, exiting")
break
def check_website(url: str) -> WebsiteStatus:
"""
Check website status - example for thread pool.
This demonstrates a real-world use case for thread pools,
making concurrent HTTP requests.
Args:
url: Website URL to check
Returns:
WebsiteStatus object with response details
"""
try:
start_time = time.time()
response = requests.get(url, timeout=5)
response_time = time.time() - start_time
return WebsiteStatus(
url=url,
status_code=response.status_code,
response_time=response_time,
timestamp=datetime.now()
)
except Exception as e:
logger.error(f"Error checking {url}: {str(e)}")
return WebsiteStatus(
url=url,
status_code=-1,
response_time=-1,
timestamp=datetime.now()
)
def main():
logger.info("Starting threading examples...")
# 1. Basic thread creation example
logger.info("\n1. Basic Threading Example:")
threads = []
for i in range(3):
thread = threading.Thread(
target=increment_counter,
args=(i, 3),
name=f"Counter-{i}"
)
threads.append(thread)
thread.start()
# Wait for counter threads to finish
for thread in threads:
thread.join()
logger.info(f"Final counter value: {counter}")
# 2. Queue communication example
logger.info("\n2. Queue Communication Example:")
# Create worker threads
workers = []
for i in range(2):
worker = threading.Thread(
target=process_queue_tasks,
name=f"Worker-{i}"
)
workers.append(worker)
worker.start()
# Add tasks to queue
for i in range(5):
task_queue.put(f"Task-{i}")
# Wait for all tasks to be processed
task_queue.join()
# Wait for workers to finish
for worker in workers:
worker.join()
# 3. Thread pool example with website checker
logger.info("\n3. Thread Pool Example:")
websites = [
"http://example.com",
"http://google.com",
"http://github.com"
]
# Using thread pool executor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit all tasks and get futures
future_to_url = {
executor.submit(check_website, url): url
for url in websites
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
status = future.result()
logger.info(
f"Website {url}: "
f"Status={status.status_code}, "
f"Time={status.response_time:.2f}s"
)
except Exception as e:
logger.error(f"Error processing {url}: {str(e)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment