Skip to content

Instantly share code, notes, and snippets.

@lemon24
Created July 7, 2019 15:49
Show Gist options
  • Save lemon24/5783a13e22af00b3442fe617e5d162d4 to your computer and use it in GitHub Desktop.
Save lemon24/5783a13e22af00b3442fe617e5d162d4 to your computer and use it in GitHub Desktop.
import textwrap
import collections
class BaseQuery:
indent = ' '
separators = collections.defaultdict(
lambda: ',',
WHERE=' AND',
)
def __init__(self):
self._query = []
def _list_for_keyword(self, keyword):
try:
return self._query[self._query.index(keyword) + 1]
except ValueError:
rv = []
self._query.extend((keyword, rv))
return rv
def _extend_keyword(self, keyword, things):
self._list_for_keyword(keyword).extend(self._clean_up(t) for t in things)
def _clean_up(self, thing):
return textwrap.dedent(thing.rstrip()).strip()
def __getattr__(self, name):
keyword = name.replace('_', ' ').strip().upper()
def add(*things):
self._extend_keyword(keyword, things)
return self
# TODO: Set __doc__ on add.
return add
def __str__(self):
def make():
pairs = zip(self._query[0::2], self._query[1::2])
for keyword, things in pairs:
yield keyword
for i, thing in enumerate(things, 1):
if i < len(things):
thing += self.separators[keyword]
yield textwrap.indent(thing, self.indent)
yield ';'
return '\n'.join(make())
class ScrollingWindowMixin:
def __init__(self):
super().__init__()
self._order_by_things = None
self._order_by_desc = None
def scrolling_window_order_by(self, *things, desc=False):
assert self._order_by_things is None
self._order_by_things = things
self._order_by_desc = desc
direction = 'DESC' if desc else 'ASC'
self.order_by(*(
'{} {}'.format(self._clean_up(thing), direction)
for thing in things
))
return self
def _make_last_label(self, i):
return ":last_{}".format(i)
def scrolling_window_limit(self, thing):
assert self._order_by_things is not None
self.limit(thing)
self._where(
"(\n{}\n) {} (\n{}\n)".format(
textwrap.indent(
',\n'.join(self._order_by_things),
self.indent,
),
'<' if self._order_by_desc else '>',
textwrap.indent(
',\n'.join(
self._make_last_label(i)
for i in range(len(self._order_by_things))
),
self.indent,
),
),
)
return self
def extract_last(self, result):
if not self._order_by_things:
return {}
names = self.select_names
assert len(result) == len(names)
return collections.OrderedDict(
(self._make_last_label(i), result[names.index(thing)])
for i, thing in enumerate(self._order_by_things)
)
class SelectAliasMixin:
def __init__(self):
super().__init__()
self._select_names = []
def _super_select(self, *things):
try:
fn = super().select
except AttributeError:
# AttributeError: 'super' object has no attribute 'select'
fn = super().__getattr__('select')
return fn(*things)
def select(self, *things):
rv = self._super_select(*things)
self._select_names.extend(self._clean_up(t) for t in things)
return rv
def select_alias(self, alias, thing):
alias = self._clean_up(alias)
thing = self._clean_up(thing)
rv = self._super_select("{} AS {}".format(thing, alias))
self._select_names.append(alias)
return rv
@property
def select_names(self):
return list(self._select_names)
class Query(SelectAliasMixin, ScrollingWindowMixin, BaseQuery): pass
def _make_better_query(
which, feed_url, has_enclosures, chunk_size, last, entry_id, search
):
query = (
Query()
.select('feeds.url', 'entries.id')
.from_('entries', 'feeds')
.where('feeds.url = entries.feed')
)
if which == 'all':
pass
elif which == 'unread':
query.where("(entries.read IS NULL OR entries.read != 1)")
elif which == 'read':
query.where("entries.read = 1")
else:
assert False, "shouldn't get here" # pragma: no cover
if feed_url:
query.where("feeds.url = :feed_url")
if entry_id:
query.where("entries.id = :entry_id")
if has_enclosures is not None:
query.select_alias("has_enclosures", """
(json_array_length(entries.enclosures) IS NULL
OR json_array_length(entries.enclosures) = 0
)
""")
query.where("AND {}has_enclosures".format('' if has_enclosures else "NOT "))
if not search:
query.select_alias("kinda_first_updated", """
coalesce(
CASE
WHEN
coalesce(entries.published, entries.updated) >= :recent_threshold
THEN entries.first_updated
END,
entries.published, entries.updated
)
""")
query.select_alias("kinda_published", "coalesce(entries.published, entries.updated)")
query.scrolling_window_order_by(*"""
kinda_first_updated
kinda_published
feeds.url
entries.id
""".split(), desc=True)
else:
# How hard would it be to add this to _make_get_entries_query?
query.select_alias("search_details", "fancy query returning search details as json")
query.select("entries_search.rank")
query.scrolling_window_order_by("entries_search.rank")
if chunk_size:
query.scrolling_window_limit(':chunk_size')
return query
args = ('all', 'feed', 0, 4, list(range(5)), 'entry', "search")
query = _make_better_query(*args)
print(query)
print()
print(query.select_names)
print(query.extract_last(('feed', 'id', "has_enclosures", "details", "rank")))
SELECT
feeds.url,
entries.id,
(json_array_length(entries.enclosures) IS NULL
OR json_array_length(entries.enclosures) = 0
) AS has_enclosures,
fancy query returning search details as json AS search_details,
entries_search.rank
FROM
entries,
feeds
WHERE
feeds.url = entries.feed AND
feeds.url = :feed_url AND
entries.id = :entry_id AND
AND NOT has_enclosures AND
(
entries_search.rank
) > (
:last_0
)
ORDER BY
entries_search.rank ASC
LIMIT
:chunk_size
;
['feeds.url', 'entries.id', 'has_enclosures', 'search_details', 'entries_search.rank']
OrderedDict([(':last_0', 'rank')])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment