Advertisement
Guest User

Untitled

a guest
Jan 24th, 2017
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.68 KB | None | 0 0
  1. <?php
  2. use Rx\Observable;
  3. use Rx\ObserverInterface;
  4. use Clue\React\Redis\Factory;
  5. use Clue\React\Redis\Client;
  6.  
  7. require_once __DIR__ . '/../vendor/autoload.php';
  8.  
  9. $loop = \React\EventLoop\Factory::create();
  10. $factory = new Factory($loop);
  11.  
  12. function asString($value)
  13. {
  14. if (is_array($value)) {
  15. return json_encode($value);
  16. }
  17. return (string) $value;
  18. }
  19.  
  20. $createStdoutObserver = function ($prefix = '') {
  21. return new Rx\Observer\CallbackObserver(
  22. function ($value) use ($prefix) {
  23. echo $prefix . "Next value: " . asString($value) . "\n";
  24. },
  25. function ($error) use ($prefix) {
  26. echo $prefix . "Exception: " . $error->getMessage() . "\n";
  27. },
  28. function () use ($prefix) {
  29. echo $prefix . "Complete!\n";
  30. }
  31. );
  32. };
  33.  
  34.  
  35. $stdoutObserver = $createStdoutObserver();
  36.  
  37. $factory->createClient('127.0.0.1:6379')
  38. ->then(function (Client $client) use ($stdoutObserver, $loop) {
  39. $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);
  40. $obs = \Rx\Observable::create(function (ObserverInterface $observer) use ($client) {
  41. $client->blpop('queue', 10)
  42. ->then(function ($result) use ($observer) {
  43. if ($result) {
  44. $observer->onNext($result);
  45. }
  46. $observer->onCompleted();
  47. });
  48. })->repeatWhen(function (\Rx\Observable $o) {
  49. return $o->concatMap(function ($value) {
  50. return Observable::timer(20);
  51. });
  52. });
  53.  
  54. $obs->subscribe($stdoutObserver, $scheduler);
  55. }, function (Exception $err) {
  56. echo $err->getMessage();
  57. });
  58.  
  59. $loop->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement