Created
January 26, 2022 00:09
-
-
Save Saigesp/44ddbbbb9670907768981a0f99840193 to your computer and use it in GitHub Desktop.
POC to use csvs as database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import csv | |
DATA_FOLDER = os.path.join('data') | |
class CsvDB: | |
def __init__(self, dbname: str, verbose: bool=True): | |
self.verbose = verbose | |
self.dbname = dbname | |
os.makedirs(os.path.join(DATA_FOLDER, self.dbname), exist_ok=True) | |
self.file = os.path.join(DATA_FOLDER, self.dbname, 'db.csv') | |
self._load_file() | |
def _msg(self, msg: str): | |
if self.verbose: print(msg) | |
def _load_file(self): | |
try: | |
with open(self.file, 'r') as f: | |
reader = csv.DictReader(f, delimiter=',') | |
self.data = {} | |
for row in reader: | |
self.data[row['id']] = row | |
self._msg(f'File {self.file} loaded') | |
except IOError: | |
open(self.file, "w").close | |
self._msg(f'File {self.file} created') | |
self.data = {} | |
def dump_file(self): | |
columns = self.get_columns() | |
with open(self.file, 'w') as f: | |
writer = csv.writer(f) | |
writer.writerow(columns) | |
for val in sorted(self.data.values(), key=lambda x: x['id']): | |
writer.writerow([val.get(col, '') for col in columns]) | |
self._msg(f'File {self.file} updated') | |
def get_columns(self): | |
columns = [] | |
for key, val in self.data.items(): | |
for col in val.keys(): | |
if not col in columns: | |
columns.append(col) | |
return columns | |
def get_id(self, id: str): | |
return self.data.get(id, None) | |
def get_one(self, query: dict): | |
for id, item in self.data.items(): | |
if all([item.get(field, '') == val for field, val in query.items()]): | |
return item | |
return None | |
def get_many(self, query: dict): | |
results = [val for key, val in self.data.items()] | |
for key, val in query.items(): | |
results = [r for r in results if r.get(key, '') == val] | |
return results | |
def count(self, query: dict): | |
return len(self.get_many(query)) | |
def update_one(self, id: str, item: dict=None, force: bool=False, dump=True): | |
exists = id in self.data | |
if exists and not force: | |
# Prevent full update on existing record without force | |
return None | |
if not exists or force: | |
# Full update of the record | |
self.data[id] = item | |
if dump: self.dump_file() | |
def patch_one(self, id: str, data: dict=None, force: bool=False, dump=True): | |
if not id in self.data: | |
# Document must exists | |
return None | |
elif data: | |
# Partial update of the record | |
item = self.data.pop(id, {}) | |
for key, val in data.items(): | |
if item.get(key, None) and not force: | |
continue | |
item[key] = val | |
self.data[item['id']] = item | |
if dump: self.dump_file() | |
def update_many(self, items: list, force: bool=False, dump=True): | |
for item in items: | |
self.update_one(id=item['id'], item=item, force=force, dump=False) | |
if dump: self.dump_file() | |
def delete_one(self, id: str, dump=True): | |
removed = self.data.pop(id, None) | |
if dump: self.dump_file() | |
return removed |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment