Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save patrickwolf/debe1f981378478273e2171a584982da to your computer and use it in GitHub Desktop.
Save patrickwolf/debe1f981378478273e2171a584982da to your computer and use it in GitHub Desktop.
SQLite3's Write-Ahead Logging (WAL) mode - multi thread read/write example
import sqlite3
import threading
import time
import os
import logging
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
DB_NAME = "production_wal_app.db"
# Marker file to track if WAL mode was attempted to be set by this application instance
# In a real-world scenario, you might not need this if you control the DB creation environment
# or can reliably check PRAGMA journal_mode on first connect without performance penalty.
WAL_INIT_MARKER = ".wal_initialized_marker"
def initialize_database(db_path):
"""
Initializes the database: creates tables and ensures WAL mode is enabled.
This function should ideally be called once at application startup.
"""
first_time_setup = not os.path.exists(db_path)
conn = None
try:
# The timeout parameter here is for the connection attempt itself,
# not the busy_timeout for operations.
conn = sqlite3.connect(db_path, timeout=10.0) # 10-second connection timeout
# Set a busy_timeout on this initial connection.
# This tells SQLite how long to wait if the database is locked by another
# connection before raising an OperationalError.
# 5000ms = 5 seconds.
conn.execute("PRAGMA busy_timeout = 5000;")
# Enable WAL mode. This is persistent for the database file.
# It's generally safe to execute on every connection, but it's most crucial
# to ensure it's set.
# Checking current mode first can avoid redundant calls if performance is critical.
current_mode_cursor = conn.execute("PRAGMA journal_mode;")
current_mode = current_mode_cursor.fetchone()[0]
logging.info(f"Current journal_mode: {current_mode}")
if current_mode.lower() != "wal":
conn.execute("PRAGMA journal_mode = WAL;")
new_mode_cursor = conn.execute("PRAGMA journal_mode;") # Verify
logging.info(f"WAL mode enabled. New journal_mode: {new_mode_cursor.fetchone()[0]}")
else:
logging.info("WAL mode was already active.")
# Create table if it doesn't exist
conn.execute("""
CREATE TABLE IF NOT EXISTS app_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_name TEXT,
message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
if first_time_setup:
logging.info(f"Database '{db_path}' created and schema initialized.")
else:
logging.info(f"Database schema verified for '{db_path}'.")
except sqlite3.Error as e:
logging.error(f"Database initialization error: {e}", exc_info=True)
# Depending on the application, you might want to raise this
# or handle it more gracefully.
raise
finally:
if conn:
conn.close()
def writer_task(task_id, num_writes):
"""Simulates a task that writes data to the database."""
logging.info(f"Writer task {task_id} starting.")
conn = None
try:
# Each thread MUST use its own connection object.
conn = sqlite3.connect(DB_NAME, timeout=10.0)
conn.execute("PRAGMA busy_timeout = 7000;") # Longer timeout for writers
# Although WAL is set database-wide, it doesn't hurt to ensure it
# if there's any doubt, or some connections might disable it.
# However, frequent PRAGMA calls can have minor overhead. Best to set once.
# conn.execute("PRAGMA journal_mode = WAL;")
for i in range(num_writes):
message = f"Message {i} from writer {task_id}"
try:
# Explicit transaction
conn.execute("BEGIN;")
conn.execute(
"INSERT INTO app_data (thread_name, message) VALUES (?, ?)",
(threading.current_thread().name, message)
)
conn.commit()
logging.info(f"Writer {task_id} committed: {message}")
except sqlite3.OperationalError as e:
conn.rollback()
if "database is locked" in str(e):
logging.warning(f"Writer {task_id} experienced a lock, retrying: {e}")
time.sleep(0.1) # Wait a bit before retrying the whole operation
# For simplicity, this example doesn't implement complex retry logic here
# but in production, you might retry the specific transaction.
else:
logging.error(f"Writer {task_id} OperationalError: {e}", exc_info=True)
break # Exit loop on other operational errors
except sqlite3.Error as e:
conn.rollback()
logging.error(f"Writer {task_id} database error: {e}", exc_info=True)
break # Exit loop on other errors
time.sleep(0.01) # Simulate some work
except sqlite3.Error as e:
logging.error(f"Writer {task_id} connection or setup error: {e}", exc_info=True)
finally:
if conn:
conn.close()
logging.info(f"Writer task {task_id} finished.")
def reader_task(task_id, num_reads):
"""Simulates a task that reads data from the database."""
logging.info(f"Reader task {task_id} starting.")
conn = None
try:
conn = sqlite3.connect(DB_NAME, timeout=10.0)
conn.execute("PRAGMA busy_timeout = 5000;")
# Readers benefit from WAL as they don't block writers and aren't blocked by them.
# Setting read_only pragma can sometimes offer minor optimizations
# conn.execute("PRAGMA query_only = ON;") # Be cautious with this
for i in range(num_reads):
try:
cursor = conn.execute("SELECT COUNT(*) FROM app_data;")
count = cursor.fetchone()[0]
logging.info(f"Reader {task_id} found {count} records.")
# Fetch a few records
cursor.execute("SELECT id, message FROM app_data ORDER BY id DESC LIMIT 3;")
records = cursor.fetchall()
# logging.info(f"Reader {task_id} fetched: {records}")
except sqlite3.Error as e:
logging.error(f"Reader {task_id} database error: {e}", exc_info=True)
break # Exit loop on error
time.sleep(0.05) # Simulate some work
except sqlite3.Error as e:
logging.error(f"Reader {task_id} connection or setup error: {e}", exc_info=True)
finally:
if conn:
conn.close()
logging.info(f"Reader task {task_id} finished.")
if __name__ == "__main__":
# Clean up previous database file for a fresh run (optional)
if os.path.exists(DB_NAME):
os.remove(DB_NAME)
logging.info(f"Removed existing database {DB_NAME}")
if os.path.exists(f"{DB_NAME}-wal"):
os.remove(f"{DB_NAME}-wal")
if os.path.exists(f"{DB_NAME}-shm"):
os.remove(f"{DB_NAME}-shm")
if os.path.exists(WAL_INIT_MARKER):
os.remove(WAL_INIT_MARKER)
# 1. Initialize Database (Enable WAL, Create Schema)
initialize_database(DB_NAME)
threads = []
# Create writer threads
for i in range(10): # 3 writer threads
thread = threading.Thread(target=writer_task, args=(i + 1, 10), name=f"WriterThread-{i+1}")
threads.append(thread)
# Create reader threads
for i in range(20): # 2 reader threads
thread = threading.Thread(target=reader_task, args=(i + 1, 15), name=f"ReaderThread-{i+1}")
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
logging.info("All tasks completed. Check the database file and logs.")
logging.info(f"Database file '{DB_NAME}' should now exist along with '{DB_NAME}-wal' and '{DB_NAME}-shm'.")
# Final check of data
final_conn = None
try:
final_conn = sqlite3.connect(DB_NAME)
cursor = final_conn.execute("SELECT COUNT(*) FROM app_data;")
total_records = cursor.fetchone()[0]
logging.info(f"Final total records in database: {total_records}")
cursor.execute("SELECT * FROM app_data ORDER BY id DESC LIMIT 5;")
for row in cursor.fetchall():
logging.info(f"Sample data: {row}")
except sqlite3.Error as e:
logging.error(f"Error during final data check: {e}")
finally:
if final_conn:
final_conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment