Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- use Rx\Observable;
- use Rx\ObserverInterface;
- use Clue\React\Redis\Factory;
- use Clue\React\Redis\Client;
- require_once __DIR__ . '/../vendor/autoload.php';
- $loop = \React\EventLoop\Factory::create();
- $factory = new Factory($loop);
- function asString($value)
- {
- if (is_array($value)) {
- return json_encode($value);
- }
- return (string) $value;
- }
- $createStdoutObserver = function ($prefix = '') {
- return new Rx\Observer\CallbackObserver(
- function ($value) use ($prefix) {
- echo $prefix . "Next value: " . asString($value) . "\n";
- },
- function ($error) use ($prefix) {
- echo $prefix . "Exception: " . $error->getMessage() . "\n";
- },
- function () use ($prefix) {
- echo $prefix . "Complete!\n";
- }
- );
- };
- $stdoutObserver = $createStdoutObserver();
- $factory->createClient('127.0.0.1:6379')
- ->then(function (Client $client) use ($stdoutObserver, $loop) {
- $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);
- $obs = \Rx\Observable::create(function (ObserverInterface $observer) use ($client) {
- $client->blpop('queue', 10)
- ->then(function ($result) use ($observer) {
- if ($result) {
- $observer->onNext($result);
- }
- $observer->onCompleted();
- });
- })->repeatWhen(function (\Rx\Observable $o) {
- return $o->concatMap(function ($value) {
- return Observable::timer(20);
- });
- });
- $obs->subscribe($stdoutObserver, $scheduler);
- }, function (Exception $err) {
- echo $err->getMessage();
- });
- $loop->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement