-
-
Save datavudeja/714a55109e36f9353d50014084d46fd1 to your computer and use it in GitHub Desktop.
sqlite model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
from utils import log | |
db_path = 'model.sqlite' | |
conn = sqlite3.connect(db_path) | |
class Model(object): | |
fields = [ | |
('id', int), | |
] | |
@classmethod | |
def new(cls, form): | |
insert_columns = ', '.join('`{}`'.format(k) for k in form.keys()) | |
insert_values = ', '.join(['?'] * len(form)) | |
sql_insert = ''' | |
INSERT INTO | |
`{}` ({}) | |
VALUES | |
({}); | |
'''.format( | |
cls.__name__.lower(), | |
insert_columns, | |
insert_values | |
) | |
log('insert', sql_insert) | |
conn.execute(sql_insert, tuple(form.values())) | |
conn.commit() | |
m = cls() | |
for k, v in form.items(): | |
setattr(m, k, v) | |
return m | |
@classmethod | |
def all(cls): | |
keys = [f[0] for f in cls.fields] | |
select_columns = ', '.join( | |
['`{}`'.format(k) for k in keys] | |
) | |
sql_select = ''' | |
SELECT | |
{} | |
FROM | |
`{}` | |
'''.format(select_columns, cls.__name__.lower()) | |
cursor = conn.execute(sql_select) | |
conn.commit() | |
ms = [] | |
for row in cursor: | |
m = cls() | |
for i, value in enumerate(row): | |
key = keys[i] | |
setattr(m, key, value) | |
ms.append(m) | |
return ms | |
@classmethod | |
def update(cls, id, form): | |
valid_fields = [] | |
valid_values = [] | |
for k, v in form.items(): | |
for f in cls.fields: | |
if k == f[0]: | |
valid_fields.append(k) | |
valid_values.append(v) | |
valid_values = tuple(valid_values) | |
update_columns = ', '.join( | |
'`{}`=?\n'.format(f) for f in valid_fields | |
) | |
sql_update = ''' | |
UPDATE | |
`{}` | |
SET | |
{} | |
WHERE | |
`id`=? | |
'''.format(cls.__name__, update_columns) | |
log('sql update', sql_update) | |
conn.execute(sql_update, valid_values + (id,)) | |
conn.commit() | |
@classmethod | |
def delete(cls, id): | |
sql_delete = ''' | |
DELETE FROM | |
`{}` | |
WHERE | |
id=? | |
'''.format(cls.__name__.lower()) | |
conn.execute(sql_delete, (id,)) | |
conn.commit() | |
@classmethod | |
def find_by(cls, **kwargs): | |
log('kwargs, ', kwargs, type(kwargs)) | |
for m in cls.all(): | |
exist = False | |
for key, value in kwargs.items(): | |
k, v = key, value | |
if v == getattr(m, k): | |
exist = True | |
else: | |
exist = False | |
if exist: | |
return m | |
return None | |
@classmethod | |
def find(cls, id): | |
return cls.find_by(id=id) | |
@classmethod | |
def find_all(cls, **kwargs): | |
log('kwargs, ', kwargs, type(kwargs)) | |
models = [] | |
for m in cls.all(): | |
exist = False | |
for key, value in kwargs.items(): | |
k, v = key, value | |
if v == getattr(m, k): | |
exist = True | |
else: | |
exist = False | |
if exist: | |
models.append(m) | |
return models | |
def __repr__(self): | |
classname = self.__class__.__name__ | |
properties = ['{}: ({})'.format(k, v) for k, v in self.__dict__.items()] | |
s = '\n'.join(properties) | |
return '< {}\n{} \n>\n'.format(classname, s) | |
class User(Model): | |
fields = Model.fields + [ | |
('username', str), | |
('password', str), | |
] | |
def reset(model): | |
table_name = model.__name__.lower() | |
sql_drop = ''' | |
DROP TABLE IF EXISTS {}; | |
'''.format(table_name) | |
log('sql drop', sql_drop) | |
conn.execute(sql_drop) | |
columns = [] | |
for name, type in model.fields: | |
if name != 'id': | |
if type is int: | |
column_type = 'INTEGER' | |
else: | |
column_type = 'TEXT' | |
c = '`{}` {} NOT NULL'.format(name, column_type) | |
columns.append(c) | |
create_columns = ',\n'.join(columns) | |
sql_create = ''' | |
CREATE TABLE `{}` ( | |
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, | |
{} | |
) | |
'''.format(table_name, create_columns) | |
log('sql create', sql_create) | |
conn.execute(sql_create) | |
conn.commit() | |
def test(): | |
form = dict( | |
username='test', | |
password='123', | |
) | |
User.new(form) | |
User.new(form) | |
assert len(User.all()) == 2 | |
User.delete(1) | |
assert len(User.all()) == 1 | |
form = dict( | |
username='ttttttt', | |
) | |
User.update(2, form) | |
u = User.find_by(username='ttttttt') | |
assert u.id == 2 | |
if __name__ == '__main__': | |
reset(User) | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment