Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace Sensphere\Cassandra;
- use Amp\Loop;
- use Cassandra\Exception\TimeoutException;
- use SC_Profiler;
- class AsyncQuery {
- protected $schema;
- public function __construct($schema) {
- $this->schema = $schema;
- }
- /**
- * @param $query
- * @param null $options
- * @return \Amp\Promise
- */
- public function query($query, $options = null) {
- $connection = ConnectionManager::getInstance()->getConnection($this->schema);
- $profiler = SC_Profiler::getInstance();
- $profileQuery = null;
- if ($profiler) {
- $profileQuery = $profiler->startQuery($query, 'cassandra', $this->schema, true);
- }
- $future = $connection->executeAsync($query, $options ?? []);
- $defered = new \Amp\Deferred();
- Loop::repeat(1, function ($watcher) use ($connection, $defered, $query, $future, $profileQuery) {
- try {
- $result = $future->get(0.0001);
- $profiler = SC_Profiler::getInstance();
- if ($profiler) {
- $profiler->stopQuery($profileQuery);
- }
- Loop::cancel($watcher);
- $defered->resolve($result);
- }
- catch (TimeoutException $e) {
- // Continue
- }
- catch (\Exception $e) {
- Loop::cancel($watcher);
- $defered->fail($e);
- }
- });
- return $defered->promise();
- }
- }
Add Comment
Please, Sign In to add comment