Skip to content

Instantly share code, notes, and snippets.

@fmartingr
Last active December 18, 2015 02:58
Show Gist options
  • Save fmartingr/5714429 to your computer and use it in GitHub Desktop.
Save fmartingr/5714429 to your computer and use it in GitHub Desktop.
Little model objects using the dataset framework: https://dataset.readthedocs.org/en/latest/index.html
# Imports
import dataset
###
# CONFIG
###
DATABASE_URL = 'sqlite:///' # In memory database
###
# GLOBALS
###
# This is considered a bad practice... :P
engine = dataset.connect(DATABASE_URL)
###
# OBJECTS
###
class DDBB(object):
"Main database object that creates engine connection."
def __init__(self, *args, **kwargs):
global engine
self._ddbb = engine
class DatasetObject(DDBB):
# PK is added only for scalation purposes
# DO NOT MODIFY
_primary_key = 'id'
_table = ''
_fields = [] # List of tuples
#_indexes = [] # List of tuples, containing a list of fields and a string
_indexes = [] # List of fields
def __init__(self, *args, **kwargs):
# Calling superclass __init__ (stablishes ddbb connection)
super(DatasetObject, self).__init__(*args, **kwargs)
self._connection = self._ddbb[self._table]
# Check if table is set
if not self._table:
raise Exception('%s _table parameter is not defined!' % (
self.__class__.__name__
)
)
# Create table indexes
# This block is for indexes using custom naming, its really needed?
# for index in self._indexes:
# fields = index[0]
# if not isinstance (fields, list):
# fields = [index[0]]
# name = None
# try:
# name = index[1]
# except:
# pass
# if name:
# result = self._connection.create_index(fields, name=name)
# else:
# result = self._connection.create_index(fields)
if len(self._indexes):
result = self._connection.create_index(self._indexes)
# Set object fields according with the configured fields
self._fields.append((self._primary_key, None)) # Adding primary key
for field in self._fields:
value = field[1]
if field[0] in kwargs:
value = kwargs[field[0]]
setattr(self, field[0], value)
def save(self):
data = dict()
for field in self._fields:
data[field[0]] = getattr(self, field[0])
try:
# Update!
data['id'] = self.id
result = self._connection.update(data, ['id'])
# result -> BOOL if update went ok
except AttributeError:
# Insert
row_id = self._connection.insert(data)
self.id = row_id
def delete(self):
try:
object_id = self.id
result = self._connection.delete(id=object_id)
self.__init__()
except AttributeError:
result = False
return result
def find(self, *args, **kwargs):
result = []
items = self._connection.find(*args, **kwargs)
for item in items:
obj = self.__class__(**item)
result.append(obj)
return result
def find_one(self, *args, **kwargs):
result = None
item = self._connection.find_one(**kwargs)
if item:
result = self.__class__(**item)
return result
class Person(DatasetObject):
_table = 'people'
_indexes = ['name', 'id']
_fields = [
('name', None),
('age', None),
]
# Creating objects:
p = Person(name="Felipe", age=26)
# Saving!
p.save()
# Find objects aka. dataset[table].find()
p2 = Person().find(port=7000)
# Find one object aka dataset[table].find_one()
p3 = Person().find_one(id=1)
# Remove object
p.delete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment