Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sqlalchemy.ext.compiler import compiles
- from sqlalchemy.sql.expression import FromClause, column, Select
- class CommonTableExpression(FromClause):
- """Represent the 'inside' of a common table
- expression."""
- def __init__(self, name, colnames):
- self.name = name
- self.colnames = colnames
- def _populate_column_collection(self):
- self._columns.update(
- (name, column(name))
- for name in self.colnames
- )
- @classmethod
- def create(cls, name, colnames):
- target = CommonTableExpression(name, colnames)
- class ctx(object):
- def __enter__(self):
- return target
- def __exit__(self, *arg, **kw):
- pass
- return ctx()
- class SelectFromCTE(FromClause):
- """Represent the 'outside' of the CTE.
- Ultimately this would be integrated into Select
- itself, since we just want a Select with an
- extra clause on top. "CommonTableExpression" objects
- would be pulled from the FROM clause
- and rendered on top.
- """
- def __init__(self, inner_thing, stmt):
- self.inner_thing = inner_thing
- self.stmt = stmt
- def _populate_column_collection(self):
- for name, c in zip(self.inner_thing.colnames, self.stmt.c):
- c._make_proxy(self, name)
- @compiles(CommonTableExpression)
- def _recur_inner_thing(element, compiler, **kw):
- return element.name
- @compiles(SelectFromCTE)
- def _recur_outer_thing(element, compiler, **kw):
- cte = (
- "WITH %s(%s) AS (\n"
- "%s\n"
- ")\n"% (
- element.inner_thing.name,
- ", ".join(element.inner_thing.colnames),
- compiler.process(element.stmt, **kw) )
- )
- compiler._cte = cte
- text = "SELECT * FROM %s" % element.inner_thing.name
- # FIXME
- if kw.get('asfrom'):
- text = "(%s) as x" % text
- return text
- @compiles(Select)
- def visit_select_cte(element, compiler, **kw):
- text = compiler.visit_select(element)
- if hasattr(compiler, '_cte') and not bool(compiler.stack):
- text = compiler._cte + text + "\n OPTION (MAXRECURSION 0)"
- if kw.get('asfrom'):
- text = "(%s)" % text
- return text
- from sqlalchemy import select, DateTime, Integer
- from sqlalchemy.sql.expression import func, cast, bindparam, text
- columns = ['date', 'year', 'month', 'day', 'day_of_week', 'day_of_year',
- 'week', 'quarter']
- def column_exps(exp):
- return [ exp, func.YEAR(exp), func.MONTH(exp), func.DAY(exp),
- func.DATEPART( text('dw'), exp),
- func.DATEPART( text('dy'), exp),
- func.DATEPART( text('wk'), exp),
- func.DATEPART( text('q'), exp) ]
- with CommonTableExpression.create('all_dates', columns) as all_dates:
- start_exp = cast(bindparam('start', type_=DateTime), DateTime)
- stop_exp = cast(bindparam('stop', type_=DateTime), DateTime)
- step_exp = bindparam('step', type_=Integer)
- next_exp = func.DATEADD( text('dd'), step_exp, all_dates.c.date )
- s1 = select(column_exps(start_exp))
- s2 = select(column_exps(next_exp), from_obj=all_dates).where(next_exp <= stop_exp)
- s = s1.union_all(s2)
- all_dates = SelectFromCTE(all_dates, s)
- from sqlalchemy.orm import mapper
- class Date(object):
- query = Session.query_property()
- @classmethod
- def range(cls, start, stop, step=1):
- return cls.query.params(start=start, stop=stop, step=step)
- mapper(Date, all_dates, primary_key=[all_dates.c.date])
- query = Date.range('2011-01-01 00:00:00', '2012-02-01 00:00:00') \
- .filter(Date.day_of_week == 7) \
- .order_by(Date.date.desc())
- for d in query:
- print d.date, "\t", d.year, "\t", d.month, "\t", d.day, "\t", \
- d.day_of_week, "\t", d.day_of_year, "\t", d.week, "\t", d.quarter
Add Comment
Please, Sign In to add comment