Skip to content

Instantly share code, notes, and snippets.

@patx
Last active January 10, 2025 04:24
Show Gist options
  • Select an option

  • Save patx/5c12d495ff142f3262325eeae81eb000 to your computer and use it in GitHub Desktop.

Select an option

Save patx/5c12d495ff142f3262325eeae81eb000 to your computer and use it in GitHub Desktop.
Async scripts to help pickleDB Use with caution these are developmental. Please post suggestions: https://github.com/patx/pickledb/issues
import threading
from concurrent.futures import ThreadPoolExecutor
class AsyncPickleDB:
def __init__(self, db_instance, max_workers=5):
"""
Initialize the AsyncPickleDB with threading and thread pool executor.
Args:
db_instance (PickleDB): An instance of PickleDB to wrap.
max_workers (int): Maximum number of threads for the executor.
"""
if not isinstance(db_instance, PickleDB):
raise ValueError("db_instance must be an instance of PickleDB")
self.db = db_instance
self._lock = threading.RLock()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def execute_async(self, func, *args, **kwargs):
"""
Execute a function asynchronously using a thread pool.
Args:
func (callable): The function to execute.
*args: Arguments for the function.
**kwargs: Keyword arguments for the function.
Returns:
concurrent.futures.Future: A Future object representing the execution.
"""
return self.executor.submit(func, *args, **kwargs)
def thread_safe_call(self, method, *args, **kwargs):
"""
Wrapper for making thread-safe calls to the database.
Args:
method (callable): The method to call on the database.
*args: Arguments for the method.
**kwargs: Keyword arguments for the method.
Returns:
Any: The result of the method call.
"""
with self._lock:
return method(*args, **kwargs)
def close(self):
"""
Shutdown the thread pool executor and save the database.
"""
self.executor.shutdown()
with self._lock:
self.db.save()
"""
EXAMPLE USAGE:
from pickledb import PickleDB
from pickledb_async import AsyncSaver
db = PickleDB("example.db", False) # ***MAKE SURE AUTO_SAVE IS SET TO FALSE***
a = AsyncSaver(db)
num_pairs = 100
print("setting keys...")
for i in range(num_pairs):
db.set(f"key{i}", f"value{i}")
a.bgsave()
print("all entries in memory...")
print("getting keys...")
for i in range(num_pairs):
db.get(f"key{i}")
print("retrieved all entries...")
a.shutdown()
"""
import threading
import queue
class AsyncSaver:
def __init__(self, db):
"""
Initialize the AsyncSaver wrapper for pickleDB.
Args:
db (PickleDB): An instance of the PickleDB class.
"""
self.db = db
self.save_queue = queue.Queue()
self.background_thread = threading.Thread(target=self._process_save_queue, daemon=True)
self.background_thread.start()
def _process_save_queue(self):
"""
Continuously process save requests from the queue in the background.
"""
while True:
try:
# Wait for a save request
self.save_queue.get(timeout=0.5)
print("Processing async save...")
success = self.db.save()
if success:
print("Async save completed successfully.")
else:
print("Async save failed.")
self.save_queue.task_done()
except queue.Empty:
continue # No tasks, check for shutdown signal
def bgsave(self):
"""
Enqueue a save request for asynchronous processing.
"""
print("Queuing async save...")
self.save_queue.put(True)
def shutdown(self):
"""
Wait for all async save tasks to complete and stop the background thread.
"""
print("Shutting down: Waiting for all async saves to complete...")
self.save_queue.join() # Wait for all queued tasks to complete
print("All async saves completed and background thread stopped.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment