Created
July 7, 2019 15:49
-
-
Save lemon24/5783a13e22af00b3442fe617e5d162d4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import textwrap | |
import collections | |
class BaseQuery: | |
indent = ' ' | |
separators = collections.defaultdict( | |
lambda: ',', | |
WHERE=' AND', | |
) | |
def __init__(self): | |
self._query = [] | |
def _list_for_keyword(self, keyword): | |
try: | |
return self._query[self._query.index(keyword) + 1] | |
except ValueError: | |
rv = [] | |
self._query.extend((keyword, rv)) | |
return rv | |
def _extend_keyword(self, keyword, things): | |
self._list_for_keyword(keyword).extend(self._clean_up(t) for t in things) | |
def _clean_up(self, thing): | |
return textwrap.dedent(thing.rstrip()).strip() | |
def __getattr__(self, name): | |
keyword = name.replace('_', ' ').strip().upper() | |
def add(*things): | |
self._extend_keyword(keyword, things) | |
return self | |
# TODO: Set __doc__ on add. | |
return add | |
def __str__(self): | |
def make(): | |
pairs = zip(self._query[0::2], self._query[1::2]) | |
for keyword, things in pairs: | |
yield keyword | |
for i, thing in enumerate(things, 1): | |
if i < len(things): | |
thing += self.separators[keyword] | |
yield textwrap.indent(thing, self.indent) | |
yield ';' | |
return '\n'.join(make()) | |
class ScrollingWindowMixin: | |
def __init__(self): | |
super().__init__() | |
self._order_by_things = None | |
self._order_by_desc = None | |
def scrolling_window_order_by(self, *things, desc=False): | |
assert self._order_by_things is None | |
self._order_by_things = things | |
self._order_by_desc = desc | |
direction = 'DESC' if desc else 'ASC' | |
self.order_by(*( | |
'{} {}'.format(self._clean_up(thing), direction) | |
for thing in things | |
)) | |
return self | |
def _make_last_label(self, i): | |
return ":last_{}".format(i) | |
def scrolling_window_limit(self, thing): | |
assert self._order_by_things is not None | |
self.limit(thing) | |
self._where( | |
"(\n{}\n) {} (\n{}\n)".format( | |
textwrap.indent( | |
',\n'.join(self._order_by_things), | |
self.indent, | |
), | |
'<' if self._order_by_desc else '>', | |
textwrap.indent( | |
',\n'.join( | |
self._make_last_label(i) | |
for i in range(len(self._order_by_things)) | |
), | |
self.indent, | |
), | |
), | |
) | |
return self | |
def extract_last(self, result): | |
if not self._order_by_things: | |
return {} | |
names = self.select_names | |
assert len(result) == len(names) | |
return collections.OrderedDict( | |
(self._make_last_label(i), result[names.index(thing)]) | |
for i, thing in enumerate(self._order_by_things) | |
) | |
class SelectAliasMixin: | |
def __init__(self): | |
super().__init__() | |
self._select_names = [] | |
def _super_select(self, *things): | |
try: | |
fn = super().select | |
except AttributeError: | |
# AttributeError: 'super' object has no attribute 'select' | |
fn = super().__getattr__('select') | |
return fn(*things) | |
def select(self, *things): | |
rv = self._super_select(*things) | |
self._select_names.extend(self._clean_up(t) for t in things) | |
return rv | |
def select_alias(self, alias, thing): | |
alias = self._clean_up(alias) | |
thing = self._clean_up(thing) | |
rv = self._super_select("{} AS {}".format(thing, alias)) | |
self._select_names.append(alias) | |
return rv | |
@property | |
def select_names(self): | |
return list(self._select_names) | |
class Query(SelectAliasMixin, ScrollingWindowMixin, BaseQuery): pass | |
def _make_better_query( | |
which, feed_url, has_enclosures, chunk_size, last, entry_id, search | |
): | |
query = ( | |
Query() | |
.select('feeds.url', 'entries.id') | |
.from_('entries', 'feeds') | |
.where('feeds.url = entries.feed') | |
) | |
if which == 'all': | |
pass | |
elif which == 'unread': | |
query.where("(entries.read IS NULL OR entries.read != 1)") | |
elif which == 'read': | |
query.where("entries.read = 1") | |
else: | |
assert False, "shouldn't get here" # pragma: no cover | |
if feed_url: | |
query.where("feeds.url = :feed_url") | |
if entry_id: | |
query.where("entries.id = :entry_id") | |
if has_enclosures is not None: | |
query.select_alias("has_enclosures", """ | |
(json_array_length(entries.enclosures) IS NULL | |
OR json_array_length(entries.enclosures) = 0 | |
) | |
""") | |
query.where("AND {}has_enclosures".format('' if has_enclosures else "NOT ")) | |
if not search: | |
query.select_alias("kinda_first_updated", """ | |
coalesce( | |
CASE | |
WHEN | |
coalesce(entries.published, entries.updated) >= :recent_threshold | |
THEN entries.first_updated | |
END, | |
entries.published, entries.updated | |
) | |
""") | |
query.select_alias("kinda_published", "coalesce(entries.published, entries.updated)") | |
query.scrolling_window_order_by(*""" | |
kinda_first_updated | |
kinda_published | |
feeds.url | |
entries.id | |
""".split(), desc=True) | |
else: | |
# How hard would it be to add this to _make_get_entries_query? | |
query.select_alias("search_details", "fancy query returning search details as json") | |
query.select("entries_search.rank") | |
query.scrolling_window_order_by("entries_search.rank") | |
if chunk_size: | |
query.scrolling_window_limit(':chunk_size') | |
return query | |
args = ('all', 'feed', 0, 4, list(range(5)), 'entry', "search") | |
query = _make_better_query(*args) | |
print(query) | |
print() | |
print(query.select_names) | |
print(query.extract_last(('feed', 'id', "has_enclosures", "details", "rank"))) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT | |
feeds.url, | |
entries.id, | |
(json_array_length(entries.enclosures) IS NULL | |
OR json_array_length(entries.enclosures) = 0 | |
) AS has_enclosures, | |
fancy query returning search details as json AS search_details, | |
entries_search.rank | |
FROM | |
entries, | |
feeds | |
WHERE | |
feeds.url = entries.feed AND | |
feeds.url = :feed_url AND | |
entries.id = :entry_id AND | |
AND NOT has_enclosures AND | |
( | |
entries_search.rank | |
) > ( | |
:last_0 | |
) | |
ORDER BY | |
entries_search.rank ASC | |
LIMIT | |
:chunk_size | |
; | |
['feeds.url', 'entries.id', 'has_enclosures', 'search_details', 'entries_search.rank'] | |
OrderedDict([(':last_0', 'rank')]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment