Advertisement
Guest User

Pusher

a guest
Mar 18th, 2015
391
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 3.43 KB | None | 0 0
  1. <?php
  2.  
  3. namespace dizews\pushStream;
  4.  
  5. use Yii;
  6. use GuzzleHttp\Client;
  7. use GuzzleHttp\Stream\Utils;
  8. use yii\base\Application;
  9. use yii\base\Component;
  10. use yii\helpers\ArrayHelper;
  11.  
  12. class Pusher extends Component
  13. {
  14.     public $format = 'json';
  15.  
  16.     public $debug = false;
  17.  
  18.     /* @var Client */
  19.     public $client;
  20.  
  21.  
  22.     public $serverOptions = [
  23.         'useSsl' => false,
  24.         'host' => '127.0.0.1',
  25.         'port' => 80,
  26.         'path' => '/pub'
  27.     ];
  28.  
  29.     public $listenServerOptions = [
  30.         'path' => '/sub',
  31.         'modes' => 'stream'
  32.     ];
  33.  
  34.     public $flushAfterRequest = true;
  35.  
  36.     protected $channels = [];
  37.  
  38.  
  39.     public function init()
  40.     {
  41.         $this->listenServerOptions = ArrayHelper::merge($this->serverOptions, $this->listenServerOptions);
  42.         $this->client = new Client();
  43.  
  44.         if ($this->flushAfterRequest) {
  45.             Yii::$app->on(Application::EVENT_AFTER_REQUEST, function () {
  46.                 $this->flush();
  47.             });
  48.         }
  49.     }
  50.  
  51.  
  52.     /**
  53.      * publish event
  54.      *
  55.      * @param string $channel channel name
  56.      * @param string $event event name
  57.      * @param mixed $data body of event
  58.      * @return mixed
  59.      */
  60.     public function publish($channel, $event, $data)
  61.     {
  62.         $this->channels[$channel][] = [
  63.             'name' => $event,
  64.             'body' => $data
  65.         ];
  66.  
  67.         if (!$this->flushAfterRequest) {
  68.             return $this->flush();
  69.         }
  70.  
  71.         return true;
  72.     }
  73.  
  74.     /**
  75.      * flush all events onto endpoint
  76.      * @return mixed
  77.      */
  78.     public function flush()
  79.     {
  80.         $endpoint = $this->makeEndpoint($this->serverOptions);
  81.         Yii::trace('flush events of pusher');
  82.  
  83.         if ($this->channels) {
  84.             foreach ($this->channels as $channel => $events) {
  85.                 //send $payload into $endpoint
  86.                 $response = $this->client->post($endpoint, [
  87.                     'debug' => $this->debug,
  88.                     'query' => ['id' => $channel],
  89.                     $this->format => [
  90.                         'events' => $events,
  91.                     ]
  92.                 ]);
  93.             }
  94.  
  95.             $this->channels = [];
  96.             return $response->getBody()->getContents();
  97.         }
  98.     }
  99.  
  100.     /**
  101.      * listen endpoint
  102.      *
  103.      * @param $channels list of channels
  104.      * @param null $callback
  105.      */
  106.     public function listen($channels, $callback = null)
  107.     {
  108.         $endpoint = $this->makeEndpoint($this->listenServerOptions);
  109.         if (substr($this->listenServerOptions['path'], -1) != '/') {
  110.             $endpoint .= '/';
  111.         }
  112.         $endpoint .= implode(',', (array)$channels);
  113.  
  114.         $response = $this->client->get($endpoint, [
  115.             'debug' => $this->debug,
  116.             'stream' => true
  117.         ]);
  118.         $body = $response->getBody();
  119.  
  120.         while (!$body->eof()) {
  121.             if (is_callable($callback)) {
  122.                 call_user_func($callback, Utils::readline($body));
  123.             } else {
  124.                 echo Utils::readline($body);
  125.             }
  126.         }
  127.     }
  128.  
  129.     /**
  130.      *
  131.      * @param $serverOptions array of server options
  132.      * @return string
  133.      */
  134.     private function makeEndpoint($serverOptions)
  135.     {
  136.         $protocol = $serverOptions['useSsl'] ? 'https' : 'http';
  137.         return $protocol .'://'. $serverOptions['host'].':'.$serverOptions['port'].$serverOptions['path'];
  138.     }
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement