Skip to content

Instantly share code, notes, and snippets.

@Saigesp
Created January 26, 2022 00:09
Show Gist options
  • Save Saigesp/44ddbbbb9670907768981a0f99840193 to your computer and use it in GitHub Desktop.
Save Saigesp/44ddbbbb9670907768981a0f99840193 to your computer and use it in GitHub Desktop.
POC to use csvs as database
import os
import csv
DATA_FOLDER = os.path.join('data')
class CsvDB:
def __init__(self, dbname: str, verbose: bool=True):
self.verbose = verbose
self.dbname = dbname
os.makedirs(os.path.join(DATA_FOLDER, self.dbname), exist_ok=True)
self.file = os.path.join(DATA_FOLDER, self.dbname, 'db.csv')
self._load_file()
def _msg(self, msg: str):
if self.verbose: print(msg)
def _load_file(self):
try:
with open(self.file, 'r') as f:
reader = csv.DictReader(f, delimiter=',')
self.data = {}
for row in reader:
self.data[row['id']] = row
self._msg(f'File {self.file} loaded')
except IOError:
open(self.file, "w").close
self._msg(f'File {self.file} created')
self.data = {}
def dump_file(self):
columns = self.get_columns()
with open(self.file, 'w') as f:
writer = csv.writer(f)
writer.writerow(columns)
for val in sorted(self.data.values(), key=lambda x: x['id']):
writer.writerow([val.get(col, '') for col in columns])
self._msg(f'File {self.file} updated')
def get_columns(self):
columns = []
for key, val in self.data.items():
for col in val.keys():
if not col in columns:
columns.append(col)
return columns
def get_id(self, id: str):
return self.data.get(id, None)
def get_one(self, query: dict):
for id, item in self.data.items():
if all([item.get(field, '') == val for field, val in query.items()]):
return item
return None
def get_many(self, query: dict):
results = [val for key, val in self.data.items()]
for key, val in query.items():
results = [r for r in results if r.get(key, '') == val]
return results
def count(self, query: dict):
return len(self.get_many(query))
def update_one(self, id: str, item: dict=None, force: bool=False, dump=True):
exists = id in self.data
if exists and not force:
# Prevent full update on existing record without force
return None
if not exists or force:
# Full update of the record
self.data[id] = item
if dump: self.dump_file()
def patch_one(self, id: str, data: dict=None, force: bool=False, dump=True):
if not id in self.data:
# Document must exists
return None
elif data:
# Partial update of the record
item = self.data.pop(id, {})
for key, val in data.items():
if item.get(key, None) and not force:
continue
item[key] = val
self.data[item['id']] = item
if dump: self.dump_file()
def update_many(self, items: list, force: bool=False, dump=True):
for item in items:
self.update_one(id=item['id'], item=item, force=force, dump=False)
if dump: self.dump_file()
def delete_one(self, id: str, dump=True):
removed = self.data.pop(id, None)
if dump: self.dump_file()
return removed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment