Advertisement
Guest User

Untitled

a guest
Oct 16th, 2018
323
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.32 KB | None | 0 0
  1. import logging
  2. import time
  3.  
  4. import pystache
  5. from flask import make_response, request
  6. from flask_login import current_user
  7. from flask_restful import abort
  8. from redash import models, settings
  9. from redash.tasks import QueryTask, record_event
  10. from redash.permissions import require_permission, not_view_only, has_access, require_access, view_only
  11. from redash.handlers.base import BaseResource, get_object_or_404
  12. from redash.utils import (collect_query_parameters,
  13. collect_parameters_from_request,
  14. gen_query_hash,
  15. json_dumps,
  16. utcnow)
  17. from redash.tasks.queries import enqueue_query
  18.  
  19.  
  20. def error_response(message):
  21. return {'job': {'status': 4, 'error': message}}, 400
  22.  
  23.  
  24. #
  25. # Run a parameterized query synchronously and return the result
  26. # DISCLAIMER: Temporary solution to support parameters in queries. Should be
  27. # removed once we refactor the query results API endpoints and handling
  28. # on the client side. Please don't reuse in other API handlers.
  29. #
  30. def run_query_sync(data_source, parameter_values, query_text, max_age=0):
  31. query_parameters = set(collect_query_parameters(query_text))
  32. missing_params = set(query_parameters) - set(parameter_values.keys())
  33. if missing_params:
  34. raise Exception('Missing parameter value for: {}'.format(", ".join(missing_params)))
  35.  
  36. if query_parameters:
  37. query_text = pystache.render(query_text, parameter_values)
  38.  
  39. if max_age <= 0:
  40. query_result = None
  41. else:
  42. query_result = models.QueryResult.get_latest(data_source, query_text, max_age)
  43.  
  44. query_hash = gen_query_hash(query_text)
  45.  
  46. if query_result:
  47. logging.info("Returning cached result for query %s" % query_hash)
  48. return query_result
  49.  
  50. try:
  51. started_at = time.time()
  52. data, error = data_source.query_runner.run_query(query_text, current_user)
  53.  
  54. if error:
  55. logging.info('got bak error')
  56. logging.info(error)
  57. return None
  58.  
  59. run_time = time.time() - started_at
  60. query_result, updated_query_ids = models.QueryResult.store_result(data_source.org_id, data_source,
  61. query_hash, query_text, data,
  62. run_time, utcnow())
  63.  
  64. models.db.session.commit()
  65. return query_result
  66. except Exception as e:
  67. if max_age > 0:
  68. abort(404, message="Unable to get result from the database, and no cached query result found.")
  69. else:
  70. abort(503, message="Unable to get result from the database.")
  71. return None
  72.  
  73. def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
  74. query_parameters = set(collect_query_parameters(query_text))
  75. missing_params = set(query_parameters) - set(parameter_values.keys())
  76. if missing_params:
  77. return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))
  78.  
  79. if data_source.paused:
  80. if data_source.pause_reason:
  81. message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
  82. else:
  83. message = '{} is paused. Please try later.'.format(data_source.name)
  84.  
  85. return error_response(message)
  86.  
  87. if query_parameters:
  88. query_text = pystache.render(query_text, parameter_values)
  89.  
  90. if max_age == 0:
  91. query_result = None
  92. else:
  93. query_result = models.QueryResult.get_latest(data_source, query_text, max_age)
  94.  
  95. if query_result:
  96. return {'query_result': query_result.to_dict()}
  97. else:
  98. job = enqueue_query(query_text, data_source, current_user.id, metadata={"Username": current_user.email, "Query ID": query_id})
  99. return {'job': job.to_dict()}
  100.  
  101.  
  102. class QueryResultListResource(BaseResource):
  103. @require_permission('execute_query')
  104. def post(self):
  105. """
  106. Execute a query (or retrieve recent results).
  107.  
  108. :qparam string query: The query text to execute
  109. :qparam number query_id: The query object to update with the result (optional)
  110. :qparam number max_age: If query results less than `max_age` seconds old are available,
  111. return them, otherwise execute the query; if omitted or -1, returns
  112. any cached result, or executes if not available. Set to zero to
  113. always execute.
  114. :qparam number data_source_id: ID of data source to query
  115. """
  116. params = request.get_json(force=True)
  117. parameter_values = collect_parameters_from_request(request.args)
  118.  
  119. query = params['query']
  120. max_age = int(params.get('max_age', -1))
  121. query_id = params.get('query_id', 'adhoc')
  122.  
  123. data_source = models.DataSource.get_by_id_and_org(params.get('data_source_id'), self.current_org)
  124.  
  125. if not has_access(data_source.groups, self.current_user, not_view_only):
  126. return {'job': {'status': 4, 'error': 'You do not have permission to run queries with this data source.'}}, 403
  127.  
  128. self.record_event({
  129. 'action': 'execute_query',
  130. 'object_id': data_source.id,
  131. 'object_type': 'data_source',
  132. 'query': query
  133. })
  134. return run_query(data_source, parameter_values, query, query_id, max_age)
  135.  
  136.  
  137. ONE_YEAR = 60 * 60 * 24 * 365.25
  138.  
  139.  
  140. class QueryResultResource(BaseResource):
  141. @staticmethod
  142. def add_cors_headers(headers):
  143. if 'Origin' in request.headers:
  144. origin = request.headers['Origin']
  145.  
  146. if set(['*', origin]) & settings.ACCESS_CONTROL_ALLOW_ORIGIN:
  147. headers['Access-Control-Allow-Origin'] = origin
  148. headers['Access-Control-Allow-Credentials'] = str(settings.ACCESS_CONTROL_ALLOW_CREDENTIALS).lower()
  149.  
  150. @require_permission('view_query')
  151. def options(self, query_id=None, query_result_id=None, filetype='json'):
  152. headers = {}
  153. self.add_cors_headers(headers)
  154.  
  155. if settings.ACCESS_CONTROL_REQUEST_METHOD:
  156. headers['Access-Control-Request-Method'] = settings.ACCESS_CONTROL_REQUEST_METHOD
  157.  
  158. if settings.ACCESS_CONTROL_ALLOW_HEADERS:
  159. headers['Access-Control-Allow-Headers'] = settings.ACCESS_CONTROL_ALLOW_HEADERS
  160.  
  161. return make_response("", 200, headers)
  162.  
  163. @require_permission('view_query')
  164. def get(self, query_id=None, query_result_id=None, filetype='json'):
  165. """
  166. Retrieve query results.
  167.  
  168. :param number query_id: The ID of the query whose results should be fetched
  169. :param number query_result_id: the ID of the query result to fetch
  170. :param string filetype: Format to return. One of 'json', 'xlsx', or 'csv'. Defaults to 'json'.
  171.  
  172. :<json number id: Query result ID
  173. :<json string query: Query that produced this result
  174. :<json string query_hash: Hash code for query text
  175. :<json object data: Query output
  176. :<json number data_source_id: ID of data source that produced this result
  177. :<json number runtime: Length of execution time in seconds
  178. :<json string retrieved_at: Query retrieval date/time, in ISO format
  179. """
  180. # TODO:
  181. # This method handles two cases: retrieving result by id & retrieving result by query id.
  182. # They need to be split, as they have different logic (for example, retrieving by query id
  183. # should check for query parameters and shouldn't cache the result).
  184. should_cache = query_result_id is not None
  185.  
  186. parameter_values = collect_parameters_from_request(request.args)
  187. max_age = int(request.args.get('maxAge', 0))
  188.  
  189. query_result = None
  190.  
  191. if query_result_id:
  192. query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query_result_id, self.current_org)
  193.  
  194. if query_id is not None:
  195. query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
  196.  
  197. if query_result is None and query is not None:
  198. if True and parameter_values:
  199. query_result = run_query_sync(query.data_source, parameter_values, query.query_text, max_age=max_age)
  200. elif query.latest_query_data_id is not None:
  201. query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query.latest_query_data_id, self.current_org)
  202.  
  203. if query is not None and query_result is not None and self.current_user.is_api_user():
  204. if query.query_hash != query_result.query_hash:
  205. abort(404, message='No cached result found for this query.')
  206.  
  207. if query_result:
  208. require_access(query_result.data_source.groups, self.current_user, view_only)
  209.  
  210. if isinstance(self.current_user, models.ApiUser):
  211. event = {
  212. 'user_id': None,
  213. 'org_id': self.current_org.id,
  214. 'action': 'api_get',
  215. 'api_key': self.current_user.name,
  216. 'file_type': filetype,
  217. 'user_agent': request.user_agent.string,
  218. 'ip': request.remote_addr
  219. }
  220.  
  221. if query_id:
  222. event['object_type'] = 'query'
  223. event['object_id'] = query_id
  224. else:
  225. event['object_type'] = 'query_result'
  226. event['object_id'] = query_result_id
  227.  
  228. record_event.delay(event)
  229.  
  230. if filetype == 'json':
  231. response = self.make_json_response(query_result)
  232. elif filetype == 'xlsx':
  233. response = self.make_excel_response(query_result)
  234. else:
  235. response = self.make_csv_response(query_result)
  236.  
  237. if len(settings.ACCESS_CONTROL_ALLOW_ORIGIN) > 0:
  238. self.add_cors_headers(response.headers)
  239.  
  240. if should_cache:
  241. response.headers.add_header('Cache-Control', 'private,max-age=%d' % ONE_YEAR)
  242.  
  243. return response
  244.  
  245. else:
  246. abort(404, message='No cached result found for this query.')
  247.  
  248. def make_json_response(self, query_result):
  249. data = json_dumps({'query_result': query_result.to_dict()})
  250. headers = {'Content-Type': "application/json"}
  251. return make_response(data, 200, headers)
  252.  
  253. @staticmethod
  254. def make_csv_response(query_result):
  255. headers = {'Content-Type': "text/csv; charset=UTF-8"}
  256. return make_response(query_result.make_csv_content(), 200, headers)
  257.  
  258. @staticmethod
  259. def make_excel_response(query_result):
  260. headers = {'Content-Type': "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}
  261. return make_response(query_result.make_excel_content(), 200, headers)
  262.  
  263.  
  264. class JobResource(BaseResource):
  265. def get(self, job_id):
  266. """
  267. Retrieve info about a running query job.
  268. """
  269. job = QueryTask(job_id=job_id)
  270. return {'job': job.to_dict()}
  271.  
  272. def delete(self, job_id):
  273. """
  274. Cancel a query job in progress.
  275. """
  276. job = QueryTask(job_id=job_id)
  277. job.cancel()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement