Guest User

Untitled

a guest
Nov 18th, 2017
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.18 KB | None | 0 0
  1. <?php
  2.  
  3. namespace Sensphere\Cassandra;
  4.  
  5. use Amp\Loop;
  6. use Cassandra\Exception\TimeoutException;
  7. use SC_Profiler;
  8.  
  9. class AsyncQuery {
  10.  
  11. protected $schema;
  12.  
  13. public function __construct($schema) {
  14. $this->schema = $schema;
  15. }
  16.  
  17. /**
  18. * @param $query
  19. * @param null $options
  20. * @return \Amp\Promise
  21. */
  22. public function query($query, $options = null) {
  23. $connection = ConnectionManager::getInstance()->getConnection($this->schema);
  24. $profiler = SC_Profiler::getInstance();
  25. $profileQuery = null;
  26. if ($profiler) {
  27. $profileQuery = $profiler->startQuery($query, 'cassandra', $this->schema, true);
  28. }
  29.  
  30. $future = $connection->executeAsync($query, $options ?? []);
  31. $defered = new \Amp\Deferred();
  32.  
  33. Loop::repeat(1, function ($watcher) use ($connection, $defered, $query, $future, $profileQuery) {
  34. try {
  35. $result = $future->get(0.0001);
  36. $profiler = SC_Profiler::getInstance();
  37. if ($profiler) {
  38. $profiler->stopQuery($profileQuery);
  39. }
  40. Loop::cancel($watcher);
  41. $defered->resolve($result);
  42. }
  43. catch (TimeoutException $e) {
  44. // Continue
  45. }
  46. catch (\Exception $e) {
  47. Loop::cancel($watcher);
  48. $defered->fail($e);
  49. }
  50. });
  51.  
  52. return $defered->promise();
  53. }
  54.  
  55. }
Add Comment
Please, Sign In to add comment