Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import textwrap
- import collections
- class BaseQuery:
- indent = ' '
- separators = collections.defaultdict(
- lambda: ',',
- WHERE=' AND',
- )
- def __init__(self):
- self._query = []
- def _list_for_keyword(self, keyword):
- try:
- return self._query[self._query.index(keyword) + 1]
- except ValueError:
- rv = []
- self._query.extend((keyword, rv))
- return rv
- def _extend_keyword(self, keyword, things):
- self._list_for_keyword(keyword).extend(self._clean_up(t) for t in things)
- def _clean_up(self, thing):
- return textwrap.dedent(thing.rstrip()).strip()
- def __getattr__(self, name):
- keyword = name.replace('_', ' ').strip().upper()
- def add(*things):
- self._extend_keyword(keyword, things)
- return self
- # TODO: Set __doc__ on add.
- return add
- def __str__(self):
- def make():
- pairs = zip(self._query[0::2], self._query[1::2])
- for keyword, things in pairs:
- yield keyword
- for i, thing in enumerate(things, 1):
- if i < len(things):
- thing += self.separators[keyword]
- yield textwrap.indent(thing, self.indent)
- yield ';'
- return '\n'.join(make())
- class ScrollingWindowMixin:
- def __init__(self):
- super().__init__()
- self._order_by_things = None
- self._order_by_desc = None
- def scrolling_window_order_by(self, *things, desc=False):
- assert self._order_by_things is None
- self._order_by_things = things
- self._order_by_desc = desc
- direction = 'DESC' if desc else 'ASC'
- self.order_by(*(
- '{} {}'.format(self._clean_up(thing), direction)
- for thing in things
- ))
- return self
- def _make_last_label(self, i):
- return ":last_{}".format(i)
- def scrolling_window_limit(self, thing):
- assert self._order_by_things is not None
- self.limit(thing)
- self._where(
- "(\n{}\n) {} (\n{}\n)".format(
- textwrap.indent(
- ',\n'.join(self._order_by_things),
- self.indent,
- ),
- '<' if self._order_by_desc else '>',
- textwrap.indent(
- ',\n'.join(
- self._make_last_label(i)
- for i in range(len(self._order_by_things))
- ),
- self.indent,
- ),
- ),
- )
- return self
- def extract_last(self, result):
- if not self._order_by_things:
- return {}
- names = self.select_names
- assert len(result) == len(names)
- return collections.OrderedDict(
- (self._make_last_label(i), result[names.index(thing)])
- for i, thing in enumerate(self._order_by_things)
- )
- class SelectAliasMixin:
- def __init__(self):
- super().__init__()
- self._select_names = []
- def _super_select(self, *things):
- try:
- fn = super().select
- except AttributeError:
- # AttributeError: 'super' object has no attribute 'select'
- fn = super().__getattr__('select')
- return fn(*things)
- def select(self, *things):
- rv = self._super_select(*things)
- self._select_names.extend(self._clean_up(t) for t in things)
- return rv
- def select_alias(self, alias, thing):
- alias = self._clean_up(alias)
- thing = self._clean_up(thing)
- rv = self._super_select("{} AS {}".format(thing, alias))
- self._select_names.append(alias)
- return rv
- @property
- def select_names(self):
- return list(self._select_names)
- class Query(SelectAliasMixin, ScrollingWindowMixin, BaseQuery): pass
- def _make_better_query(
- which, feed_url, has_enclosures, chunk_size, last, entry_id, search
- ):
- query = (
- Query()
- .select('feeds.url', 'entries.id')
- .from_('entries', 'feeds')
- .where('feeds.url = entries.feed')
- )
- if which == 'all':
- pass
- elif which == 'unread':
- query.where("(entries.read IS NULL OR entries.read != 1)")
- elif which == 'read':
- query.where("entries.read = 1")
- else:
- assert False, "shouldn't get here" # pragma: no cover
- if feed_url:
- query.where("feeds.url = :feed_url")
- if entry_id:
- query.where("entries.id = :entry_id")
- if has_enclosures is not None:
- query.select_alias("has_enclosures", """
- (json_array_length(entries.enclosures) IS NULL
- OR json_array_length(entries.enclosures) = 0
- )
- """)
- query.where("AND {}has_enclosures".format('' if has_enclosures else "NOT "))
- if not search:
- query.select_alias("kinda_first_updated", """
- coalesce(
- CASE
- WHEN
- coalesce(entries.published, entries.updated) >= :recent_threshold
- THEN entries.first_updated
- END,
- entries.published, entries.updated
- )
- """)
- query.select_alias("kinda_published", "coalesce(entries.published, entries.updated)")
- query.scrolling_window_order_by(*"""
- kinda_first_updated
- kinda_published
- feeds.url
- entries.id
- """.split(), desc=True)
- else:
- # How hard would it be to add this to _make_get_entries_query?
- query.select_alias("search_details", "fancy query returning search details as json")
- query.select("entries_search.rank")
- query.scrolling_window_order_by("entries_search.rank")
- if chunk_size:
- query.scrolling_window_limit(':chunk_size')
- return query
- args = ('all', 'feed', 0, 4, list(range(5)), 'entry', "search")
- query = _make_better_query(*args)
- print(query)
- print()
- print(query.select_names)
- print(query.extract_last(('feed', 'id', "has_enclosures", "details", "rank")))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement