Created
January 18, 2010 16:54
-
-
Save apg/280176 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# broken, hacked up version of | |
# http://bret.appspot.com/entry/how-friendfeed-uses-mysql for sqlite | |
# database = bequas.Database('something.db') | |
# database.initialize(); database.create_index('hello', unique=True); | |
# database.insert({'hello': 'world'}) | |
import sqlite3 | |
import uuid | |
import datetime | |
try: | |
import json | |
except: | |
import simplejson as json | |
DEBUG = True | |
class Backend(object): | |
def __init__(self, db, **kwargs): | |
self._db = db | |
self._conn = sqlite3.connect(kwargs.pop('db_name', '')) | |
def close(self): | |
self._conn.close() | |
def initialize(self): | |
cursor = self._conn.cursor() | |
try: | |
sql = ["""CREATE TABLE IF NOT EXISTS documents ( | |
row_id integer auto_increment primary key, | |
id char(32) unique, | |
contents blob, | |
updated datetime, | |
created datetime | |
)""", | |
] | |
for stmt in sql: | |
print 'initialize: ', stmt | |
cursor.execute(stmt) | |
finally: | |
cursor.close() | |
def create_index(self, column, unique=False): | |
"""Creates an index in the database using table and columns | |
""" | |
cursor = self._conn.cursor() | |
try: | |
sql = "CREATE %(unique)s INDEX %(name)s ON %(table)s (%(column)s)" | |
params = {'unique': ''} | |
if unique: | |
params['unique'] = ' UNIQUE ' | |
column_name = column.split(None, 1)[0] | |
table = self._db._table_name(column) | |
params['name'] = self._db._index_name(column) | |
params['column'] = 'value' | |
params['table'] = table | |
ready_sql = sql % params | |
if DEBUG: | |
print 'backend.create_index: ', ready_sql | |
cursor.execute(ready_sql) | |
finally: | |
cursor.close() | |
def drop_index(self, table, column): | |
cursor = self._conn.cursor() | |
try: | |
sql = "DROP INDEX IF EXISTS %s_%s_idx" | |
cursor.execute(sql % (table, column)) | |
finally: | |
cursor.close() | |
def create_table(self, name, columns): | |
cursor = self._conn.cursor() | |
try: | |
sql = "CREATE TABLE %(name)s (%(columns)s)" | |
cursor.execute(sql % {'name': name, 'columns': ','.join(columns)}) | |
finally: | |
cursor.close() | |
def drop_table(self, name): | |
cursor = self._conn.cursor() | |
try: | |
sql = "DROP TABLE IF EXISTS %s" | |
cursor.execute(sql % name) | |
finally: | |
cursor.close() | |
def query(self, sql, params=None, limit=None): | |
cursor = self._conn.cursor() | |
try: | |
args = [sql] | |
if params: | |
args.append(params) | |
print 'query args: ', args | |
cursor.execute(*args) | |
if limit == 1: | |
return cursor.fetchone() | |
else: | |
return cursor.fetchmany() | |
finally: | |
cursor.close() | |
def execute(self, sql, params=None): | |
cursor = self._conn.cursor() | |
try: | |
args = [sql] | |
if params: | |
args.append(params) | |
cursor.execute(*args) | |
finally: | |
cursor.close() | |
def insert(self, table, col_values): | |
sql = """INSERT INTO %(table)s (%(columns)s) VALUES (%(placeholders)s)""" | |
columns = [] | |
qs = [] | |
values = [] | |
for col, val in col_values.items(): | |
columns.append(col) | |
qs.append('?') | |
values.append(val) | |
new_sql = sql % {'table': table, | |
'columns': ','.join(columns), | |
'placeholders': ','.join(qs)} | |
if DEBUG: | |
print 'insert: ', new_sql, values | |
return self.execute(new_sql, params=values) | |
def get_indexes(self, where=None, params=None): | |
cursor = self._conn.cursor() | |
try: | |
sql = "SELECT name FROM sqlite_master WHERE type = 'index'" | |
if where: | |
sql = '%s AND %s' % (sql, where) | |
if DEBUG: | |
print 'get_indexes:', sql | |
if params: | |
newcur = cursor.execute(sql, params) | |
else: | |
newcur = cursor.execute(sql) | |
return [row[0] for row in newcur.fetchmany()] | |
finally: | |
cursor.close() | |
def get_tables(self, where=None, params=None): | |
cursor = self._conn.cursor() | |
try: | |
sql = "SELECT name FROM sqlite_master WHERE type = 'table'" | |
if where: | |
sql += '%s AND %s' % (sql, params) | |
if params: | |
newcur = cursor.execute(sql, params) | |
else: | |
newcur = cursor.execute(sql) | |
newcur = cursor.execute(sql) | |
return [row[0] for row in newcur.fetchmany()] | |
finally: | |
cursor.close() | |
class Database(object): | |
"""Wraps the database specific methods | |
""" | |
def __init__(self, db_type='sqlite3'): | |
self._backend = Backend(self) | |
def initialize(self): | |
return self._backend.initialize() | |
def insert(self, spec): | |
"""Inserts a document into the database""" | |
# first, check to see if this thing is a document | |
uid = uuid.uuid1() | |
new_id = uid.get_hex() | |
if isinstance(spec, dict): | |
spec['_id'] = new_id | |
else: | |
raise TypeError('spec must be a dictionary') | |
# create the document | |
document = {'id': new_id, | |
'contents': json.dumps(spec), | |
'created': datetime.datetime.utcnow(),} | |
self._backend.insert('documents', document) | |
indexes = self.get_indexes_for_columns(spec.keys()) | |
for idx in indexes: | |
column = self._column_from_index(idx) | |
self._backend.insert(self._table_name(column), | |
{'document_id': new_id, | |
'value': spec[column]}) | |
return new_id | |
def update(self, id, spec): | |
if isinstance(spec, dict): | |
spec['_id'] = id | |
else: | |
raise TypeError('spec must be a dictionary') | |
document = {'id': id, | |
'contents': json.dumps(spec), | |
'updated': datetime.datetime.utcnow()} | |
self._backend.update('documents', | |
document, | |
where="id = ?", params=(id,)) | |
indexes = self.get_indexes_for_columns(spec.keys()) | |
for idx in indexes: | |
column = self._column_from_index(idx) | |
self._backend.upsert(self._table_name(column), | |
{'document_id': id, | |
'value': spec[column]}, | |
where='document_id = ?', params=(id,)) | |
def delete(self, id): | |
document = self.get(id=id) | |
if document: | |
columns = self.get_indexes_for_columns(document.keys()) | |
for column in columns: | |
table = self._table_name(column) | |
self._backend.execute('DELETE FROM %s WHERE id = ?' % table, | |
params=(id,)) | |
self._backend.execute('DELETE FROM documents WHERE row_id = ?', | |
params=(document['row_id'],)) | |
def get(self, spec): | |
"""Returns a document specified by spec | |
""" | |
if isinstance(spec, str): | |
ready_sql = "SELECT contents FROM documents WHERE id = ?" | |
params = [spec] | |
elif isinstance(spec, dict): | |
joins = [] | |
wheres = ['1 = 1'] | |
params = [] | |
indexes = self.get_indexes_for_columns(spec.keys()) | |
for idx in indexes: | |
column_name = self._column_from_index(idx) | |
table_name = self._table_name(column_name) | |
joins.append('INNER JOIN %(n)s ON %(n)s.document_id = documents.id'\ | |
% {'n': table_name}) | |
wheres.append('%s.value like ?' % table_name) | |
params.append(spec[column_name]) | |
sql = '''SELECT contents FROM documents %(joins)s WHERE %(where)s''' | |
ready_sql = sql % {'joins': ' '.join(joins), | |
'where': ' AND '.join(wheres)} | |
else: | |
raise TypeError("get must be given an id or a spec") | |
if DEBUG: | |
print 'get: ', ready_sql, params | |
record = self._backend.query(ready_sql, params=params, limit=1) | |
if record: | |
return json.loads(record[0]) | |
return None | |
def all(self): | |
"""Returns a cursor object, providing all documents in the database | |
which can then be filtered. | |
""" | |
pass | |
def create_index(self, column, unique=False): | |
"""Creates an index for column | |
""" | |
self._backend.create_table('document_%s_value' % column, | |
('document_id char(32) primary key', | |
'value blob')) | |
self._backend.create_index(column, | |
unique=unique) | |
def drop_index(self, column): | |
self._backend.drop_table('document_%s' % column) | |
self._backend.drop_index('document_%s' % column, 'value') | |
def get_indexes_for_columns(self, columns): | |
index_names = ["'%s'" % self._index_name(col) for col in columns] | |
return [idx for idx in self._backend.get_indexes( | |
'name in (%s)' % ','.join(index_names))] | |
def close(self): | |
self._backend.close() | |
def _index_name(self, col): | |
return 'document_%s_value_idx' % col | |
def _table_name(self, table): | |
return 'document_%s_value' % table | |
def _column_from_index(self, idx): | |
return idx[9:-10] | |
def _table_from_index(self, idx): | |
return idx[0:-4] | |
class Cursor(object): | |
def __init__(self, db): | |
self._db = db | |
def where(self, spec): | |
pass | |
def range(self, max_or_start, max=None): | |
if max: | |
start = max_or_start | |
else: | |
start = 0 | |
max = max_or_start | |
def sort(self, desc): | |
pass | |
def count(self): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment