Last active
September 23, 2023 07:17
-
-
Save proguy914629bot/f5f89bf25d22f4e500bbd1ef8005c1be to your computer and use it in GitHub Desktop.
A Simple Cache Implementation in Python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
My license: | |
PLEASE PLEASE PLEASE DO NOT COPY PASTE THIS CODE. | |
Steal all = Give credit or not and star or not = Me Cri :( | |
Steal some = Give some credit or star = Me happy :) | |
Steal some = Give no credit and no star = Me Cri :( | |
At least star this gist if you want to take some of the code for inspration. | |
""" | |
import typing, asyncio | |
class CacheNotFound(Exception): # An Excpetion in `Cache.remove` when a invalid cache has been passed. | |
def __init__(self, name): | |
super().__init__("Cache %s is not found." % name) | |
class CacheInstanceError(Exception): # An Excpetion in `Cache.modify` when a invalid cache argument instance has been passed. | |
def __init__(self, argname, argtype, expected, function): | |
super().__init__( | |
f"Expected {expected} in function {function} on argument {argname} but got {argtype} instead." | |
) | |
class CacheAlreadyExists(Exception): # An Excpetion in `Cache.add` when a cache already exists. | |
def __init__(self, name): | |
super().__init__("Cache %s already exists." % str(name)) | |
class CacheResult: # The class that will handle all cache result objects. | |
def __init__(self, name: str, value: typing.Any): | |
self.name = str(name) # The name of the cache | |
self.value = value # The value of the cache | |
def raw(self) -> typing.Dict[str, typing.Any]: | |
return { | |
self.name: self.value | |
} # The raw dict. | |
@property | |
def size(self): | |
raw = self.raw() | |
try: | |
import humanize # Tries to import the humanize pkg. (`pip install humanize` to install it btw) | |
except ModuleNotFoundError: # the module can't be found. | |
return raw.__sizeof__() # Returns the cache in bytes | |
else: | |
return humanize.naturalsize(raw.__sizeof__()) # Returns the humanize size. | |
def __repr__(self) -> str: | |
return "<name: {0.__class__.__name__}={0}, value: {1.__class__.__name__}={1}>".format(self.name, self.value) | |
def __str__(self) -> str: | |
return self.name | |
def __eq__(self, other) -> bool: # This function is responsible for the `x == y` things. | |
if not isinstance(other, CacheResult): | |
return False # returns False cause `other` isn't a CacheResult obj. | |
if (other.name == self.name) and (other.value == self.value): | |
return True # Name and value matches | |
else: | |
return False # Name and value doesn't match. | |
class Cache: | |
def __init__(self): | |
self._cache: typing.Dict[str, typing.Any] = {} # {'Cache Name': 'Cache Result'} --> The raw cache obj. | |
# Optional, just add aliases. | |
self.append = self.add | |
self.rm = self.remove | |
self.insert = self.add | |
self.create_if_not_exists = self.create_if_not_found | |
@property | |
def cache(self) -> typing.List[CacheResult]: # Returns a list of CacheResult (The class we made that handles the cache result) objects. | |
"""Returns the full-kept cache.""" | |
l = [] | |
for k, v in self._cache.items(): # Iterate through the raw cache obj | |
l.append(CacheResult(k, v)) # Append the item to the list | |
return l | |
def create_if_not_found(self, name: str, value: typing.Any, *, case_insensitive: bool=False) -> CacheResult: | |
""" | |
Create a cache if it isn't found. | |
This will try to get the cache at first. | |
If it fails, then it will create a brand new cache. | |
Parameters | |
---------- | |
name: :class:`str` | |
The name of the cache. This will try to get it. | |
value :class:`typing.Any` | |
The value of the cache. This will be used to create | |
the cache. | |
case_insensitive: :class:`bool` | |
Sets the case insensitive. This will be used to get the | |
cache. | |
""" | |
res = self.get(name, case_insensitive=case_insensitive) # Tries to get the cache | |
if res: # If it doesn't return None, | |
return res # Return the CacheResult obj | |
else: | |
return self.add(name, value) # Add the cache | |
def get(self, name: str, *, case_insensitive: bool = False) -> typing.Union[CacheResult, None]: | |
""" | |
Gets a cache and returns a CacheResult object. | |
Parameters | |
---------- | |
name: :class:`str` | |
Gets the cache from name. | |
case_insensitive: :class:`bool` | |
Sets the case sensitive settings for searching the cache. | |
If None, it will default to False. If no result is found then, | |
it will become True. Else it returns the result found. | |
Returns: :class:`typing.Union[CacheResult, None]` | |
""" | |
name = str(name) # Makes it a string, 99.9% for linters :) | |
for k, v in self._cache.items(): # Iterate through raw cache obj | |
if case_insensitive: # If case sensitive has been set to True, | |
if name.lower() == k.lower(): # Make it case sensitive. | |
return CacheResult(k, v) # Return the obj if found. | |
if case_insensitive is None: # Non-case sensitive at first, if no results found, goes to case insensitive. | |
result = self.get(name) # Re-get the cache (Non-case sensitive). | |
if result: | |
return result # Return the result if found. | |
result = self.get(name, case_insensitive=True) # Get the cache but make it case sensitive | |
return result # Make it explicitly return. | |
else: | |
if name == k: # Non-case senstive | |
return CacheResult(k, v) # Return the obj if found. | |
return None # No matches found. | |
@property | |
def size(self): | |
""" | |
Returns the size of the cache. | |
Inspired by HomelessDev/Neutral#4881 | |
""" | |
try: | |
import humanize # Tries to import the humanize pkg. (`pip install humanize` to install it btw) | |
except ModuleNotFoundError: # the module can't be found. | |
return self._cache.__sizeof__() # Returns the cache | |
else: | |
return humanize.naturalsize(self._cache.__sizeof__()) | |
async def _remove_cache(self, name: str, seconds: float) -> None: # This is how our delete_after kwarg works in add function. | |
seconds = float(seconds) # Make the seconds arg explicitly a float so it can sleep | |
name = str(name) # Set name to string | |
await asyncio.sleep(seconds) # Sleeps the function before deleting it | |
try: | |
self.remove(name) # Removes the cache. | |
except CacheNotFound: # Cache cannot be found | |
pass | |
def add(self, name: str, value: typing.Any, *, delete_after: typing.Union[int, float] = None) -> CacheResult: | |
""" | |
Adds an item from the cache. | |
Parameters | |
---------- | |
name: :class:`str` | |
The name of the cache. | |
value: :class:`typing.Any` | |
The value of the cache. | |
delete_after: :class:`typing.Union[int, float]` | |
Tells the bot when to delete the cache. This | |
accepts floats and integers in seconds. Defaults | |
to None (Stays until the cache is cleared or | |
it is removed manually). | |
Returns: :class:`CacheResult` | |
""" | |
seconds = delete_after # idk why, to lazy to change the code back. | |
name = str(name) # Set it to a string. | |
if seconds: # If it isn't None, | |
seconds = float(seconds) # Make it a float | |
if name in self._cache: # The cache already exists in the raw cache obj. | |
raise CacheAlreadyExists(name) # Raise the error. | |
self._cache[name] = value # Add it to cache. | |
if seconds: # If it isn't None | |
if seconds >= 0: # and seconds isn't 0 or lower, | |
asyncio.create_task(self._remove_cache(name, seconds)) # Launch a task in the background. | |
return CacheResult(name, value) # Return the cache obj | |
def remove(self, cache: typing.Union[str, CacheResult], *, case_insensitive = False) -> typing.Dict[str, typing.Any]: | |
""" | |
Removes an item from the cache. | |
Parameters | |
---------- | |
name: :class:`typing.Union[str, CacheResult]` | |
The name of the cache, or a CacheResult obj. | |
case_insensitive: :class:`bool` | |
Sets the case sensitive settings for searching the cache. | |
If None, it will default to False. If no result is found then, | |
it will become True. Else it returns the result found. | |
Raises | |
------ | |
:class:`CacheNotFound` | |
This raises when the cache cannot be found or the | |
cache is not a valid cache. | |
Returns: :class:`typing.Dict[str, typing.Any]` | |
""" | |
if isinstance(cache, CacheResult): | |
name = cache.name # Get the name | |
else: | |
name = cache | |
new_dict_items = self._cache.items() # Load the raw cache items | |
for k, v in new_dict_items: # Iterate through them | |
if case_insensitive: # If it has been set to case insensitive, | |
if name.lower() == k.lower(): # If the name matches the cache name (Case insensitive) | |
self._cache = dict(new_dict_items) # Make it a new dict | |
self._cache.pop(k) # Remove the cache from the dict. | |
return {k: v} # Returns the removed cache. | |
if case_insensitive is None: # Non-case sensitive at first, if no results found, goes to case insensitive. | |
try: | |
result = self.remove_cache(name) # Tries to remove it | |
except CacheNotFound: | |
pass | |
else: | |
return result | |
# Remove the cache. | |
result = self.remove_cache(name, case_insensitive=True) | |
return result # Returns the obj | |
else: | |
if name == k: # Non-case senstive | |
# Removes the cache if found. | |
self._cache = dict(new_dict_items) | |
self._cache.pop(k) | |
return {k: v} | |
raise CacheNotFound(name) # Raise the error if the cache doesn't exist. | |
def clear(self): | |
"""Clears the cache""" | |
self._cache = {} # Set the raw cache to an empty dict. Optionally you can use `self._cache.clear()`. | |
def modify(self, cache: typing.Union[CacheResult, str], *, name: str = None, value: typing.Any = None) -> CacheResult: | |
""" | |
Modify's a Cache Name/Value. | |
Parameters | |
---------- | |
cahce: :class:`typing.Union[CacheResult, str]` | |
The cache object or the name of the cache to be | |
modified. | |
name: :class:`str` | |
The new name of the cache. Defaults to None (Does not change). | |
value: :class:`typing.Any` | |
The new value of the cache. Defaults to None (Does not change). | |
Raises | |
------ | |
:class:`CacheNotFound` | |
This exception raises when an invalid cache has been passed. | |
:class:`CacheInstanceError` | |
This excpetion raises when an invalid instance has been passed. | |
Returns: :class:`CacheResult` | |
""" | |
if isinstance(cache, str): # Check if cache arg is a string | |
cache = self.get(cache) # Get the cache | |
if not cache: # cache var can return None, which means it doesn't exist. | |
raise CacheNotFound(cache) # Cache can't be found, so we need to raise an error. | |
else: # cache arg is not a string. | |
if not isinstance(cache, CacheResult): # Check if cache is a CacheResult obj | |
raise CacheInstanceError( # Raise an error cause it isn't. | |
"cache", cache.__class__.__name__, "CacheResult or str", "Cache.modify" | |
) | |
if cache not in self.cache: # The cache isn't a valid cache. Probably user made the class themselves or removed it somwehere? | |
raise CacheNotFound(getattr(cache, 'name', cache)) # Raise the error. | |
name = name or cache.name # If a name is provided, it will go to it's new name (the one that has been provided), else goes to the cache name by default. | |
value = value or cache.value # If a value is provided, it will go to it's new value (the one that has been provided), else goes to the cache value by default. | |
self._cache[name] = value # Update it on the raw cache obj | |
return CacheResult(name, value) # Return the CacheResult obj. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment