Last active
January 10, 2025 04:24
-
-
Save patx/5c12d495ff142f3262325eeae81eb000 to your computer and use it in GitHub Desktop.
Async scripts to help pickleDB Use with caution these are developmental. Please post suggestions: https://github.com/patx/pickledb/issues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| class AsyncPickleDB: | |
| def __init__(self, db_instance, max_workers=5): | |
| """ | |
| Initialize the AsyncPickleDB with threading and thread pool executor. | |
| Args: | |
| db_instance (PickleDB): An instance of PickleDB to wrap. | |
| max_workers (int): Maximum number of threads for the executor. | |
| """ | |
| if not isinstance(db_instance, PickleDB): | |
| raise ValueError("db_instance must be an instance of PickleDB") | |
| self.db = db_instance | |
| self._lock = threading.RLock() | |
| self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
| def execute_async(self, func, *args, **kwargs): | |
| """ | |
| Execute a function asynchronously using a thread pool. | |
| Args: | |
| func (callable): The function to execute. | |
| *args: Arguments for the function. | |
| **kwargs: Keyword arguments for the function. | |
| Returns: | |
| concurrent.futures.Future: A Future object representing the execution. | |
| """ | |
| return self.executor.submit(func, *args, **kwargs) | |
| def thread_safe_call(self, method, *args, **kwargs): | |
| """ | |
| Wrapper for making thread-safe calls to the database. | |
| Args: | |
| method (callable): The method to call on the database. | |
| *args: Arguments for the method. | |
| **kwargs: Keyword arguments for the method. | |
| Returns: | |
| Any: The result of the method call. | |
| """ | |
| with self._lock: | |
| return method(*args, **kwargs) | |
| def close(self): | |
| """ | |
| Shutdown the thread pool executor and save the database. | |
| """ | |
| self.executor.shutdown() | |
| with self._lock: | |
| self.db.save() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| EXAMPLE USAGE: | |
| from pickledb import PickleDB | |
| from pickledb_async import AsyncSaver | |
| db = PickleDB("example.db", False) # ***MAKE SURE AUTO_SAVE IS SET TO FALSE*** | |
| a = AsyncSaver(db) | |
| num_pairs = 100 | |
| print("setting keys...") | |
| for i in range(num_pairs): | |
| db.set(f"key{i}", f"value{i}") | |
| a.bgsave() | |
| print("all entries in memory...") | |
| print("getting keys...") | |
| for i in range(num_pairs): | |
| db.get(f"key{i}") | |
| print("retrieved all entries...") | |
| a.shutdown() | |
| """ | |
| import threading | |
| import queue | |
| class AsyncSaver: | |
| def __init__(self, db): | |
| """ | |
| Initialize the AsyncSaver wrapper for pickleDB. | |
| Args: | |
| db (PickleDB): An instance of the PickleDB class. | |
| """ | |
| self.db = db | |
| self.save_queue = queue.Queue() | |
| self.background_thread = threading.Thread(target=self._process_save_queue, daemon=True) | |
| self.background_thread.start() | |
| def _process_save_queue(self): | |
| """ | |
| Continuously process save requests from the queue in the background. | |
| """ | |
| while True: | |
| try: | |
| # Wait for a save request | |
| self.save_queue.get(timeout=0.5) | |
| print("Processing async save...") | |
| success = self.db.save() | |
| if success: | |
| print("Async save completed successfully.") | |
| else: | |
| print("Async save failed.") | |
| self.save_queue.task_done() | |
| except queue.Empty: | |
| continue # No tasks, check for shutdown signal | |
| def bgsave(self): | |
| """ | |
| Enqueue a save request for asynchronous processing. | |
| """ | |
| print("Queuing async save...") | |
| self.save_queue.put(True) | |
| def shutdown(self): | |
| """ | |
| Wait for all async save tasks to complete and stop the background thread. | |
| """ | |
| print("Shutting down: Waiting for all async saves to complete...") | |
| self.save_queue.join() # Wait for all queued tasks to complete | |
| print("All async saves completed and background thread stopped.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment