Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace Jsor\MysqlAsync;
- use React\EventLoop\LoopInterface;
- use React\Promise\Deferred;
- class Connection
- {
- private $loop;
- private $parameters;
- private $mysqli;
- private $jobs;
- private $connectDeferred;
- public function __construct(LoopInterface $loop, array $parameters = array(), array $options = array())
- {
- $this->loop = $loop;
- $this->parameters = $parameters;
- $this->jobs = new \SplQueue();
- $this->mysqli = mysqli_init();
- foreach ($options as $name => $option) {
- $this->mysqli->options($name, $option);
- }
- }
- public function connect()
- {
- if ($this->connectDeferred) {
- return $this->connectDeferred->promise();
- }
- $this->connectDeferred = new Deferred();
- $host = isset($this->parameters['host']) ? $this->parameters['host'] : null;
- $username = isset($this->parameters['username']) ? $this->parameters['username'] : null;
- $password = isset($this->parameters['password']) ? $this->parameters['password'] : null;
- $dbname = isset($this->parameters['dbname']) ? $this->parameters['dbname'] : null;
- $port = isset($this->parameters['port']) ? $this->parameters['port'] : null;
- $socket = isset($this->parameters['socket']) ? $this->parameters['socket'] : null;
- @$this->mysqli->real_connect($host, $username, $password, $dbname, $port, $socket);
- if ($this->mysqli->connect_error) {
- $this->connectDeferred->reject($this->mysqli->connect_error);
- } else {
- $this->connectDeferred->resolve($this);
- }
- return $this->connectDeferred->promise();
- }
- public function query($sql)
- {
- $job = (object) array(
- 'type' => 'query',
- 'sql' => $sql
- );
- return $this
- ->connect()
- ->then(function ($connection) use ($job) {
- return $connection->enqueue($job);
- });
- }
- public function end()
- {
- $job = (object) array(
- 'type' => 'close'
- );
- return $this
- ->connect()
- ->then(function ($connection) use ($job) {
- return $connection->enqueue($job);
- });
- }
- public function enqueue($job)
- {
- $deferred = new Deferred();
- $job->resolver = $deferred;
- $this->jobs->enqueue($job);
- if (1 === count($this->jobs)) {
- $this->start();
- }
- return $deferred->promise();
- }
- protected function start()
- {
- if ($this->jobs->isEmpty()) {
- return;
- }
- $job = $this->jobs->bottom();
- switch ($job->type) {
- case 'query':
- $this->mysqli->query($job->sql, \MYSQLI_ASYNC);
- $this->poll();
- break;
- case 'close':
- if ($this->mysqli->close()) {
- $job->resolver->resolve();
- } else {
- $job->resolver->reject(mysqli_error($this->mysqli));
- }
- $this->dequeue();
- break;
- }
- }
- public function poll()
- {
- $links = $errors = $reject = array($this->mysqli);
- if (0 === mysqli_poll($links, $errors, $reject, 1)) {
- $this->loop->addTimer(0.001, array($this, 'poll'));
- return;
- }
- $job = $this->jobs->bottom();
- $result = $links[0]->reap_async_query();
- if ($result) {
- $job->resolver->resolve($result);
- } else {
- $job->resolver->reject(mysqli_error($links[0]));
- }
- $this->dequeue();
- }
- protected function dequeue()
- {
- $this->jobs->dequeue();
- if ($this->jobs->isEmpty()) {
- //$this->emit('drain');
- } else {
- $this->start();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement