Created
September 18, 2012 04:01
-
-
Save flaneur2020/3741183 to your computer and use it in GitHub Desktop.
a query class for quixote the god damned.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- encoding:utf-8 -*- | |
| # | |
| # 2012 by fleuria | |
| # Inspired by vino from lepture | |
| # | |
| import logging | |
| import collections | |
| import MySQLdb as mysql | |
| # warning: a stub to execute mysql, write a correct one for yourself. | |
| def _mysql_query(sql, *params): | |
| logger.info(sql) | |
| logger.info(params) | |
| conn = mysql.connect(use_unicode=False) | |
| cursor = conn.cursor(mysql.cursors.DictCursor) | |
| cursor.execute(sql, *params) | |
| conn.commit() | |
| return cursor | |
| # Sample: | |
| # ------- | |
| # query = Query(Question, {'name = %s': name}, 'id desc') | |
| # questions = query.fetch(20, 10) | |
| # count = query.count() | |
| class Query: | |
| def __init__(self, klass, conditions=None, order_by=None, fields='*', table=None, **patterns): | |
| self.klass = klass | |
| self.table = table or (klass.__name__+'s').lower() | |
| self.conditions = conditions or {} | |
| self.order = order_by | |
| self.fields = fields | |
| self.filter(**patterns) | |
| def order_by(self, order): | |
| self.order = order | |
| return self | |
| # a wrapper of where statements | |
| # query.where({'name = %s': name}) | |
| # query.where({'time > now()': []}) | |
| def where(self, conditions): | |
| self.conditions = dict(self.conditions.items() + conditions.items()) | |
| return self | |
| # a django-like DSL to make where() easier, | |
| # only support __gt, __lt, __in yet. | |
| # query.filter(id=12, name='fleuria') | |
| # query.filter(id__in=[12, 21]) | |
| # query.filter(id__gt=12]) | |
| def filter(self, **patterns): | |
| for key, params in patterns.items(): | |
| field = key.split('__')[0] | |
| if key.endswith('__in'): | |
| if params == []: | |
| continue | |
| cond = field + ' in (%s)' % ','.join(['%s'] * len(params)) | |
| elif key.endswith('__gt'): | |
| cond = field + ' > %s' | |
| elif key.endswith('__lt'): | |
| cond = field + ' < %s' | |
| else: | |
| cond = key + ' = %s' | |
| self.conditions[cond] = params | |
| return self | |
| # ------------------------------- | |
| # query = Query(Post, order_by='id desc') | |
| # query.filter(author_name='fleuria').fetch() # fetch all posts | |
| # query.filter(author_name='fleuria').fetch(10) # fetch top 10 posts | |
| # query.filter(author_name='fleuria').fetch(10, offset=100) # fetch 10 posts, start at 100 | |
| def fetch(self, count=None, offset=None): | |
| limits = offset and (offset, count) or count | |
| sql, params = self.to_sql(limits=limits) | |
| logger.info('%s: %s', sql, params) | |
| cursor = mysql_query(sql, params) | |
| rs = cursor.fetchall() or [] | |
| return [self.klass(**r) for r in rs] | |
| def count(self): | |
| cursor = mysql_query(*self.to_sql(count=True)) | |
| r = cursor.fetchone() or {'count': 0} | |
| return r['count'] | |
| def to_sql(self, limits=None, count=False): | |
| where_sql = '' | |
| limit_sql = '' | |
| order_sql = '' | |
| params = [] | |
| if self.conditions: | |
| where_sql = ' where ' + ' and '.join(self.conditions.keys()) | |
| for value in self.conditions.values(): | |
| if isinstance(value, list): | |
| params.extend(value) | |
| else: | |
| params.append(value) | |
| if self.order: | |
| order_sql = ' order by %s ' % self.order | |
| if limits: | |
| if isinstance(limits, tuple) or isinstance(limits, list): | |
| limit_sql = " limit %s, %s " | |
| params.extend(limits) | |
| else: | |
| limit_sql = ' limit %s ' | |
| params.append(limits) | |
| # | |
| fields = self.fields | |
| if count: | |
| fields = 'count(*) as count' | |
| sql = 'select %s from %s ' % (fields, self.table) | |
| sql = sql + where_sql + order_sql + limit_sql | |
| return [sql, params] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment