Advertisement
Guest User

Untitled

a guest
Nov 16th, 2018
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.33 KB | None | 0 0
  1. public function push2($queue = null, $data = null, $permanent = false, $params = array())
  2. {
  3. // We check if the queue is not empty then we declare the queue
  4. if(empty($queue)) {
  5. rabbitmq_client_output('You did not specify the [queue] parameter', 'error', 'x');
  6. throw new Exception("You did not specify the [queue] parameter");
  7. }
  8.  
  9. // We declare the queue
  10. $this->channel->queue_declare($queue, false, $permanent, false, false, false, null, null);
  11.  
  12. // If the information given are in an array, we convert it in json format
  13. $data = (is_array($data)) ? json_encode($data) : $data;
  14.  
  15. // Create a new instance of message then push it into the selected queue
  16. //$item = new PhpAmqpLib\Message\AMQPMessage($data, $params);
  17.  
  18. // Publish to the queue
  19. //$this->channel->basic_publish($item, '', $queue);
  20.  
  21. //Set Publish Attributes
  22. $attributes = array(
  23. 'correlation_id' => '1',
  24. 'reply_to' => $queue
  25. );
  26.  
  27. $rpcQueue = 'rpc_queue';
  28.  
  29. $channel = new PhpAmqpLib\Channel\AMQPChannel($this->connexion);
  30. //$channel->setPrefetchCount(1);
  31.  
  32. //$channel->exchange_declare()
  33. $channel->exchange_declare('tasks', 'fanout', false, false, false);
  34.  
  35. //$exchange = new PhpAmqpLib\Wire\AMQPExchange($channel);
  36.  
  37.  
  38. /* $channel->publish(
  39. $data,
  40. $this->rpcQueue,
  41. PhpAmqpLib\Message\AMQP_NOPARAM,
  42. $attributes
  43. ); */
  44.  
  45. $callback = function(PhpAmqpLib\Message\AMQPEnvelope $message, PhpAmqpLib\Message\AMQPQueue $q) {
  46. if($message->getCorrelationId() == $this->corrId) {
  47. //echo sprintf("CorrelationID: %s",$message->getCorrelationId()), PHP_EOL;
  48. //echo sprintf("Response: %s",$message->getBody()), PHP_EOL;
  49. $this->response = $message->getBody();
  50. $q->nack($message->getDeliveryTag());
  51. return false;
  52. }
  53. };
  54.  
  55. $this->queue->consume($callback);
  56.  
  57. //Return RPC Results
  58. return $this->response;
  59.  
  60. // Output
  61. // ($this->show_output) ? rabbitmq_client_output('Pushing "'.$item->body.'" to "'.$queue.'" queue -> OK', null, '+') : true;
  62. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement