Created
May 31, 2025 22:56
-
-
Save patrickwolf/debe1f981378478273e2171a584982da to your computer and use it in GitHub Desktop.
SQLite3's Write-Ahead Logging (WAL) mode - multi thread read/write example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import threading | |
import time | |
import os | |
import logging | |
# Configure basic logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') | |
DB_NAME = "production_wal_app.db" | |
# Marker file to track if WAL mode was attempted to be set by this application instance | |
# In a real-world scenario, you might not need this if you control the DB creation environment | |
# or can reliably check PRAGMA journal_mode on first connect without performance penalty. | |
WAL_INIT_MARKER = ".wal_initialized_marker" | |
def initialize_database(db_path): | |
""" | |
Initializes the database: creates tables and ensures WAL mode is enabled. | |
This function should ideally be called once at application startup. | |
""" | |
first_time_setup = not os.path.exists(db_path) | |
conn = None | |
try: | |
# The timeout parameter here is for the connection attempt itself, | |
# not the busy_timeout for operations. | |
conn = sqlite3.connect(db_path, timeout=10.0) # 10-second connection timeout | |
# Set a busy_timeout on this initial connection. | |
# This tells SQLite how long to wait if the database is locked by another | |
# connection before raising an OperationalError. | |
# 5000ms = 5 seconds. | |
conn.execute("PRAGMA busy_timeout = 5000;") | |
# Enable WAL mode. This is persistent for the database file. | |
# It's generally safe to execute on every connection, but it's most crucial | |
# to ensure it's set. | |
# Checking current mode first can avoid redundant calls if performance is critical. | |
current_mode_cursor = conn.execute("PRAGMA journal_mode;") | |
current_mode = current_mode_cursor.fetchone()[0] | |
logging.info(f"Current journal_mode: {current_mode}") | |
if current_mode.lower() != "wal": | |
conn.execute("PRAGMA journal_mode = WAL;") | |
new_mode_cursor = conn.execute("PRAGMA journal_mode;") # Verify | |
logging.info(f"WAL mode enabled. New journal_mode: {new_mode_cursor.fetchone()[0]}") | |
else: | |
logging.info("WAL mode was already active.") | |
# Create table if it doesn't exist | |
conn.execute(""" | |
CREATE TABLE IF NOT EXISTS app_data ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
thread_name TEXT, | |
message TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
""") | |
conn.commit() | |
if first_time_setup: | |
logging.info(f"Database '{db_path}' created and schema initialized.") | |
else: | |
logging.info(f"Database schema verified for '{db_path}'.") | |
except sqlite3.Error as e: | |
logging.error(f"Database initialization error: {e}", exc_info=True) | |
# Depending on the application, you might want to raise this | |
# or handle it more gracefully. | |
raise | |
finally: | |
if conn: | |
conn.close() | |
def writer_task(task_id, num_writes): | |
"""Simulates a task that writes data to the database.""" | |
logging.info(f"Writer task {task_id} starting.") | |
conn = None | |
try: | |
# Each thread MUST use its own connection object. | |
conn = sqlite3.connect(DB_NAME, timeout=10.0) | |
conn.execute("PRAGMA busy_timeout = 7000;") # Longer timeout for writers | |
# Although WAL is set database-wide, it doesn't hurt to ensure it | |
# if there's any doubt, or some connections might disable it. | |
# However, frequent PRAGMA calls can have minor overhead. Best to set once. | |
# conn.execute("PRAGMA journal_mode = WAL;") | |
for i in range(num_writes): | |
message = f"Message {i} from writer {task_id}" | |
try: | |
# Explicit transaction | |
conn.execute("BEGIN;") | |
conn.execute( | |
"INSERT INTO app_data (thread_name, message) VALUES (?, ?)", | |
(threading.current_thread().name, message) | |
) | |
conn.commit() | |
logging.info(f"Writer {task_id} committed: {message}") | |
except sqlite3.OperationalError as e: | |
conn.rollback() | |
if "database is locked" in str(e): | |
logging.warning(f"Writer {task_id} experienced a lock, retrying: {e}") | |
time.sleep(0.1) # Wait a bit before retrying the whole operation | |
# For simplicity, this example doesn't implement complex retry logic here | |
# but in production, you might retry the specific transaction. | |
else: | |
logging.error(f"Writer {task_id} OperationalError: {e}", exc_info=True) | |
break # Exit loop on other operational errors | |
except sqlite3.Error as e: | |
conn.rollback() | |
logging.error(f"Writer {task_id} database error: {e}", exc_info=True) | |
break # Exit loop on other errors | |
time.sleep(0.01) # Simulate some work | |
except sqlite3.Error as e: | |
logging.error(f"Writer {task_id} connection or setup error: {e}", exc_info=True) | |
finally: | |
if conn: | |
conn.close() | |
logging.info(f"Writer task {task_id} finished.") | |
def reader_task(task_id, num_reads): | |
"""Simulates a task that reads data from the database.""" | |
logging.info(f"Reader task {task_id} starting.") | |
conn = None | |
try: | |
conn = sqlite3.connect(DB_NAME, timeout=10.0) | |
conn.execute("PRAGMA busy_timeout = 5000;") | |
# Readers benefit from WAL as they don't block writers and aren't blocked by them. | |
# Setting read_only pragma can sometimes offer minor optimizations | |
# conn.execute("PRAGMA query_only = ON;") # Be cautious with this | |
for i in range(num_reads): | |
try: | |
cursor = conn.execute("SELECT COUNT(*) FROM app_data;") | |
count = cursor.fetchone()[0] | |
logging.info(f"Reader {task_id} found {count} records.") | |
# Fetch a few records | |
cursor.execute("SELECT id, message FROM app_data ORDER BY id DESC LIMIT 3;") | |
records = cursor.fetchall() | |
# logging.info(f"Reader {task_id} fetched: {records}") | |
except sqlite3.Error as e: | |
logging.error(f"Reader {task_id} database error: {e}", exc_info=True) | |
break # Exit loop on error | |
time.sleep(0.05) # Simulate some work | |
except sqlite3.Error as e: | |
logging.error(f"Reader {task_id} connection or setup error: {e}", exc_info=True) | |
finally: | |
if conn: | |
conn.close() | |
logging.info(f"Reader task {task_id} finished.") | |
if __name__ == "__main__": | |
# Clean up previous database file for a fresh run (optional) | |
if os.path.exists(DB_NAME): | |
os.remove(DB_NAME) | |
logging.info(f"Removed existing database {DB_NAME}") | |
if os.path.exists(f"{DB_NAME}-wal"): | |
os.remove(f"{DB_NAME}-wal") | |
if os.path.exists(f"{DB_NAME}-shm"): | |
os.remove(f"{DB_NAME}-shm") | |
if os.path.exists(WAL_INIT_MARKER): | |
os.remove(WAL_INIT_MARKER) | |
# 1. Initialize Database (Enable WAL, Create Schema) | |
initialize_database(DB_NAME) | |
threads = [] | |
# Create writer threads | |
for i in range(10): # 3 writer threads | |
thread = threading.Thread(target=writer_task, args=(i + 1, 10), name=f"WriterThread-{i+1}") | |
threads.append(thread) | |
# Create reader threads | |
for i in range(20): # 2 reader threads | |
thread = threading.Thread(target=reader_task, args=(i + 1, 15), name=f"ReaderThread-{i+1}") | |
threads.append(thread) | |
# Start all threads | |
for thread in threads: | |
thread.start() | |
# Wait for all threads to complete | |
for thread in threads: | |
thread.join() | |
logging.info("All tasks completed. Check the database file and logs.") | |
logging.info(f"Database file '{DB_NAME}' should now exist along with '{DB_NAME}-wal' and '{DB_NAME}-shm'.") | |
# Final check of data | |
final_conn = None | |
try: | |
final_conn = sqlite3.connect(DB_NAME) | |
cursor = final_conn.execute("SELECT COUNT(*) FROM app_data;") | |
total_records = cursor.fetchone()[0] | |
logging.info(f"Final total records in database: {total_records}") | |
cursor.execute("SELECT * FROM app_data ORDER BY id DESC LIMIT 5;") | |
for row in cursor.fetchall(): | |
logging.info(f"Sample data: {row}") | |
except sqlite3.Error as e: | |
logging.error(f"Error during final data check: {e}") | |
finally: | |
if final_conn: | |
final_conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment