Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from bao-qian/model.py
Created February 12, 2025 14:24
Show Gist options
  • Save datavudeja/714a55109e36f9353d50014084d46fd1 to your computer and use it in GitHub Desktop.
Save datavudeja/714a55109e36f9353d50014084d46fd1 to your computer and use it in GitHub Desktop.
sqlite model
import sqlite3
from utils import log
db_path = 'model.sqlite'
conn = sqlite3.connect(db_path)
class Model(object):
fields = [
('id', int),
]
@classmethod
def new(cls, form):
insert_columns = ', '.join('`{}`'.format(k) for k in form.keys())
insert_values = ', '.join(['?'] * len(form))
sql_insert = '''
INSERT INTO
`{}` ({})
VALUES
({});
'''.format(
cls.__name__.lower(),
insert_columns,
insert_values
)
log('insert', sql_insert)
conn.execute(sql_insert, tuple(form.values()))
conn.commit()
m = cls()
for k, v in form.items():
setattr(m, k, v)
return m
@classmethod
def all(cls):
keys = [f[0] for f in cls.fields]
select_columns = ', '.join(
['`{}`'.format(k) for k in keys]
)
sql_select = '''
SELECT
{}
FROM
`{}`
'''.format(select_columns, cls.__name__.lower())
cursor = conn.execute(sql_select)
conn.commit()
ms = []
for row in cursor:
m = cls()
for i, value in enumerate(row):
key = keys[i]
setattr(m, key, value)
ms.append(m)
return ms
@classmethod
def update(cls, id, form):
valid_fields = []
valid_values = []
for k, v in form.items():
for f in cls.fields:
if k == f[0]:
valid_fields.append(k)
valid_values.append(v)
valid_values = tuple(valid_values)
update_columns = ', '.join(
'`{}`=?\n'.format(f) for f in valid_fields
)
sql_update = '''
UPDATE
`{}`
SET
{}
WHERE
`id`=?
'''.format(cls.__name__, update_columns)
log('sql update', sql_update)
conn.execute(sql_update, valid_values + (id,))
conn.commit()
@classmethod
def delete(cls, id):
sql_delete = '''
DELETE FROM
`{}`
WHERE
id=?
'''.format(cls.__name__.lower())
conn.execute(sql_delete, (id,))
conn.commit()
@classmethod
def find_by(cls, **kwargs):
log('kwargs, ', kwargs, type(kwargs))
for m in cls.all():
exist = False
for key, value in kwargs.items():
k, v = key, value
if v == getattr(m, k):
exist = True
else:
exist = False
if exist:
return m
return None
@classmethod
def find(cls, id):
return cls.find_by(id=id)
@classmethod
def find_all(cls, **kwargs):
log('kwargs, ', kwargs, type(kwargs))
models = []
for m in cls.all():
exist = False
for key, value in kwargs.items():
k, v = key, value
if v == getattr(m, k):
exist = True
else:
exist = False
if exist:
models.append(m)
return models
def __repr__(self):
classname = self.__class__.__name__
properties = ['{}: ({})'.format(k, v) for k, v in self.__dict__.items()]
s = '\n'.join(properties)
return '< {}\n{} \n>\n'.format(classname, s)
class User(Model):
fields = Model.fields + [
('username', str),
('password', str),
]
def reset(model):
table_name = model.__name__.lower()
sql_drop = '''
DROP TABLE IF EXISTS {};
'''.format(table_name)
log('sql drop', sql_drop)
conn.execute(sql_drop)
columns = []
for name, type in model.fields:
if name != 'id':
if type is int:
column_type = 'INTEGER'
else:
column_type = 'TEXT'
c = '`{}` {} NOT NULL'.format(name, column_type)
columns.append(c)
create_columns = ',\n'.join(columns)
sql_create = '''
CREATE TABLE `{}` (
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
{}
)
'''.format(table_name, create_columns)
log('sql create', sql_create)
conn.execute(sql_create)
conn.commit()
def test():
form = dict(
username='test',
password='123',
)
User.new(form)
User.new(form)
assert len(User.all()) == 2
User.delete(1)
assert len(User.all()) == 1
form = dict(
username='ttttttt',
)
User.update(2, form)
u = User.find_by(username='ttttttt')
assert u.id == 2
if __name__ == '__main__':
reset(User)
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment