Skip to content

Instantly share code, notes, and snippets.

@patx
Created January 9, 2025 08:47
Show Gist options
  • Select an option

  • Save patx/3ad47fc3814d7293feb902f6ab49c48f to your computer and use it in GitHub Desktop.

Select an option

Save patx/3ad47fc3814d7293feb902f6ab49c48f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2019 Harrison Erd
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import sys
import os
import signal
import shutil
import json
from tempfile import NamedTemporaryFile
from threading import Thread
def load(location, auto_dump, sig=True):
'''Return a pickledb object. location is the path to the json file.'''
return PickleDB(location, auto_dump, sig)
class PickleDB(object):
key_string_error = TypeError('Key/name must be a string!')
def __init__(self, location, auto_dump, sig):
'''Creates a database object and loads the data from the location path.
If the file does not exist it will be created on the first update.
'''
self.load(location, auto_dump)
self.dthread = None
if sig:
self.set_sigterm_handler()
def __getitem__(self, item):
'''Syntax sugar for get()'''
return self.get(item)
def __setitem__(self, key, value):
'''Sytax sugar for set()'''
return self.set(key, value)
def __delitem__(self, key):
'''Sytax sugar for rem()'''
return self.rem(key)
def set_sigterm_handler(self):
'''Assigns sigterm_handler for graceful shutdown during dump()'''
def sigterm_handler(*args, **kwargs):
if self.dthread is not None:
self.dthread.join()
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
def load(self, location, auto_dump):
'''Loads, reloads or changes the path to the db file'''
location = os.path.expanduser(location)
self.loco = location
self.auto_dump = auto_dump
if os.path.exists(location):
self._loaddb()
else:
self.db = {}
return True
def _dump(self):
'''Dump to a temporary file, and then move to the actual location'''
with NamedTemporaryFile(mode='wt', delete=False) as f:
json.dump(self.db, f)
if os.stat(f.name).st_size != 0:
shutil.move(f.name, self.loco)
def dump(self):
'''Force dump memory db to file'''
self.dthread = Thread(target=self._dump)
self.dthread.start()
self.dthread.join()
return True
def _loaddb(self):
'''Load or reload the json info from the file'''
try:
self.db = json.load(open(self.loco, 'rt'))
except ValueError:
if os.stat(self.loco).st_size == 0: # Error raised because file is empty
self.db = {}
else:
raise # File is not empty, avoid overwriting it
def _autodumpdb(self):
'''Write/save the json dump into the file if auto_dump is enabled'''
if self.auto_dump:
self.dump()
def set(self, key, value):
'''Set the str value of a key'''
if isinstance(key, str):
self.db[key] = value
self._autodumpdb()
return True
else:
raise self.key_string_error
def get(self, key):
'''Get the value of a key'''
try:
return self.db[key]
except KeyError:
return False
def getall(self):
'''Return a list of all keys in db'''
return self.db.keys()
def exists(self, key):
'''Return True if key exists in db, return False if not'''
return key in self.db
def rem(self, key):
'''Delete a key'''
if not key in self.db: # return False instead of an exception
return False
del self.db[key]
self._autodumpdb()
return True
def totalkeys(self, name=None):
'''Get a total number of keys, lists, and dicts inside the db'''
if name is None:
total = len(self.db)
return total
else:
total = len(self.db[name])
return total
def append(self, key, more):
'''Add more to a key's value'''
tmp = self.db[key]
self.db[key] = tmp + more
self._autodumpdb()
return True
def lcreate(self, name):
'''Create a list, name must be str'''
if isinstance(name, str):
self.db[name] = []
self._autodumpdb()
return True
else:
raise self.key_string_error
def ladd(self, name, value):
'''Add a value to a list'''
self.db[name].append(value)
self._autodumpdb()
return True
def lextend(self, name, seq):
'''Extend a list with a sequence'''
self.db[name].extend(seq)
self._autodumpdb()
return True
def lgetall(self, name):
'''Return all values in a list'''
return self.db[name]
def lget(self, name, pos):
'''Return one value in a list'''
return self.db[name][pos]
def lrange(self, name, start=None, end=None):
'''Return range of values in a list '''
return self.db[name][start:end]
def lremlist(self, name):
'''Remove a list and all of its values'''
number = len(self.db[name])
del self.db[name]
self._autodumpdb()
return number
def lremvalue(self, name, value):
'''Remove a value from a certain list'''
self.db[name].remove(value)
self._autodumpdb()
return True
def lpop(self, name, pos):
'''Remove one value in a list'''
value = self.db[name][pos]
del self.db[name][pos]
self._autodumpdb()
return value
def llen(self, name):
'''Returns the length of the list'''
return len(self.db[name])
def lappend(self, name, pos, more):
'''Add more to a value in a list'''
tmp = self.db[name][pos]
self.db[name][pos] = tmp + more
self._autodumpdb()
return True
def lexists(self, name, value):
'''Determine if a value exists in a list'''
return value in self.db[name]
def dcreate(self, name):
'''Create a dict, name must be str'''
if isinstance(name, str):
self.db[name] = {}
self._autodumpdb()
return True
else:
raise self.key_string_error
def dadd(self, name, pair):
'''Add a key-value pair to a dict, "pair" is a tuple'''
self.db[name][pair[0]] = pair[1]
self._autodumpdb()
return True
def dget(self, name, key):
'''Return the value for a key in a dict'''
return self.db[name][key]
def dgetall(self, name):
'''Return all key-value pairs from a dict'''
return self.db[name]
def drem(self, name):
'''Remove a dict and all of its pairs'''
del self.db[name]
self._autodumpdb()
return True
def dpop(self, name, key):
'''Remove one key-value pair in a dict'''
value = self.db[name][key]
del self.db[name][key]
self._autodumpdb()
return value
def dkeys(self, name):
'''Return all the keys for a dict'''
return self.db[name].keys()
def dvals(self, name):
'''Return all the values for a dict'''
return self.db[name].values()
def dexists(self, name, key):
'''Determine if a key exists or not in a dict'''
return key in self.db[name]
def dmerge(self, name1, name2):
'''Merge two dicts together into name1'''
first = self.db[name1]
second = self.db[name2]
first.update(second)
self._autodumpdb()
return True
def deldb(self):
'''Delete everything from the database'''
self.db = {}
self._autodumpdb()
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment