Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Get everything! handles every single possible select query required
- # by the app..
- # note: field_map is not the nicest thing in the world..
- # it should be replaced by something wiser sometime!
- def query(g, params, tables, query_fields=None, query_op='ilike', what='*',
- join=None, where=None, field_map=None, group_by=None, order_by=None,
- include_total=True, debug=False):
- if query_fields is None:
- query_fields = ()
- if join is None:
- join = {}
- if where is None:
- where = {}
- if field_map is None:
- field_map = {}
- assert query_fields.__class__ is tuple
- cursor = g.db.cursor()
- if 'sort' in params:
- assert order_by is None
- sort_args = json.loads(params['sort'])
- order_by = ','.join(['%s %s' % (field_map.get(sa['property'],
- sa['property']),
- sa['direction']) for sa in sort_args])
- # todo: put that somewhere that makes more sense
- comp_op_map = {'lt': '<', 'gt': '>', 'le': '<=', 'ge': '>=', 'eq': '='}
- if params.get('filter', '').strip():
- for filter_arg in json.loads(params['filter']):
- f = filter_arg['field']
- if filter_arg['type'] == 'string':
- s = set(['%%%s%%' % v for v in filter_arg['value'].split()])
- where[(field_map.get(f, f), 'ilike', 'unaccent')] = s
- elif filter_arg['type'] == 'list':
- where[field_map.get(f, f)] = tuple(filter_arg['value'])
- else:
- fav = filter_arg['value']
- where[(field_map.get(f, f),
- comp_op_map[filter_arg.get('comparison', 'eq')])] = fav
- if params.get('query', '').strip():
- # autocomplete query
- if query_op.lower() in ['ilike', 'like']:
- # this is needed because the concat operator (||)
- # doesn't work with null values
- query_fields = ["coalesce(%s::text, '')" % ff
- for ff in query_fields]
- s = set(['%%%s%%' % v for v in params['query'].split()])
- where[('||'.join(query_fields), query_op, 'unaccent')] = s
- # exact field query: assumes only 1 field,
- # because there is only 1 query value
- else:
- assert len(query_fields) == 1
- where[(query_fields[0], query_op)] = params['query']
- json_out = {'success': True}
- if include_total and not debug:
- json_out['total'] = pg.count(cursor, tables, what=what, join=join,
- where=where, group_by=group_by,
- debug_assert=False)
- json_out['rows'] = pg.select(cursor, tables, what=what, join=join,
- where=where, offset=params.get('start', None),
- limit=params.get('limit', None) or None,
- group_by=group_by, order_by=order_by,
- debug_assert=debug)
- return json_out
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement