Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Loop::run(function () use ($urls, $pingInterval, $startTime, $symbols, $redis){
- /** @var Client\Connection[] $connections */
- $connectionMessage = [
- 'type' => 'subscribe',
- 'topic'=> '/spotMarket/tradeOrders',
- 'privateChannel'=>true,
- 'response'=>true
- ];
- foreach ($urls as $key => $url)
- {
- $connect = yield connect($url);
- $connect->send(json_encode($connectionMessage));
- Amp\Loop::repeat(10000, function () use($redis, $connect)
- {
- $ping = json_encode(['type' => 'ping']);
- $connect->send($ping);
- });
- $connections[] = $connect;
- }
- $i = 0;
- $a = 0;
- /** @var Websocket\Message $message */
- while (true)
- {
- foreach ($connections as $key => $connection)
- {
- $message = yield $connection->receive();
- if(!empty($message))
- {
- $payload = yield $message->buffer();
- Amp\asyncCall(function () use ($redis, $payload){
- yield $redis->set('asyncCallPayload' . time(), $payload);
- $payload = json_decode($payload,true);
- if(!empty($payload['type']))
- {
- if($payload['type'] !== 'welcome' && $payload['type'] !== 'pong')
- {
- yield $redis->set('socketResOther'. time(), json_encode($payload));
- }
- elseif($payload['type'] === 'welcome')
- {
- yield $redis->set('socketResWelcome'. time(), json_encode($payload));
- }
- }
- if(!empty($payload['subject']) && !empty($payload['data']))
- {
- $payloadData = $payload['data'];
- yield $redis->set('socketResData'. time(), json_encode($payload));
- $this->executeKucoinOrderListenerProcessing($payloadData);
- }
- });
- }
- }
- if(yield $redis->has('laravel_database_handlerRedisFlagKucoin'))
- {
- yield $redis->set('connectionIsCloseDelete'. time(), true);
- yield $redis->delete('laravel_database_handlerRedisFlagKucoin');
- Loop::stop();
- break;
- }
- yield new Delayed(1000);
- }
- // $blockDetector->stop();
- });
Advertisement
Add Comment
Please, Sign In to add comment