Advertisement
Guest User

Untitled

a guest
Apr 1st, 2017
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.67 KB | None | 0 0
  1. require_once '../vendor/autoload.php';
  2. use PhpAmqpLibConnectionAMQPConnection;
  3. use PhpAmqpLibMessageAMQPMessage;
  4.  
  5. $GLOBALS["channel"] = $channel;
  6. $GLOBALS["consumerTag"] = $consumerTag;
  7.  
  8. $server = "localhost";
  9. $port = 5672;
  10. $vhost = "/";
  11. $username = "guest";
  12. $password = "guest";
  13. $exchangeName = "testEx";
  14. $replyQueue = "replyQ";
  15. $replyKey = "reply";
  16.  
  17. $requestKey = "requestKey";
  18.  
  19. //callback funtion on receiving reply messages
  20. $onMessage = function ($message) {
  21. echo "In Client : " . $message->body.PHP_EOL;
  22. //stop consuming once receives the reply
  23. $GLOBALS["channel"]->basic_cancel($GLOBALS["consumerTag"]);
  24. };
  25.  
  26. try {
  27. //connect
  28. $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
  29. $channel = $connection->channel();
  30.  
  31. // Declare exchange and queue
  32. $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
  33.  
  34. //listen for reply messages
  35. $channel->queue_declare($replyQueue, false, false, $exclusive = false, $auto_delete = false);
  36. $channel->queue_bind($replyQueue, $exchangeName, $replyKey);
  37. $consumerTag = $channel->basic_consume($replyQueue, "", false, $no_ack = true, false, false, $callback = $onMessage);
  38.  
  39. //send request message
  40. $message = new AMQPMessage("Bilal is the message !", array("content_type" => "text/plain", "delivery_mode" => 1, "reply_to" => $replyKey));
  41. $channel->basic_publish($message, $exchangeName, $requestKey);
  42.  
  43. //start consuming
  44. while(count($channel->callbacks)) {
  45. $channel->wait();
  46. }
  47.  
  48. //disconnect
  49. $connection->close();
  50. } catch(Exception $e) {
  51. echo $e.PHP_EOL;
  52. }
  53.  
  54. require_once '../vendor/autoload.php';
  55. use PhpAmqpLibConnectionAMQPConnection;
  56. use PhpAmqpLibMessageAMQPMessage;
  57.  
  58. $GLOBALS["channel"] = $channel;
  59. $GLOBALS["exchangeName"] = $exchangeName;
  60.  
  61. $server = "localhost";
  62. $port = 5672;
  63. $vhost = "/";
  64. $username = "guest";
  65. $password = "guest";
  66. $exchangeName = "testEx";
  67. $requestQueue = "requestQ";
  68.  
  69. $requestKey = "request";
  70.  
  71. //callback funtion on receiving request messages, reply to the reply_to header
  72. $onMessage = function ($message) {
  73. echo "In Server1 : " . $message->body . PHP_EOL;
  74.  
  75. try {
  76. $replyMessage = new AMQPMessage("Server 1 : Reply to ".$message->body, array("content_type" => "text/plain", "delivery_mode" => 1));
  77. $GLOBALS["channel"]->basic_publish($replyMessage, $GLOBALS["exchangeName"], $message->get("reply_to"));
  78. $GLOBALS["channel"]->basic_ack($message->delivery_info["delivery_tag"]);
  79. } catch (Exception $e) {
  80. $GLOBALS["channel"]->basic_nack($message->delivery_info["delivery_tag"]);
  81. }
  82. };
  83.  
  84. while (true) {
  85. try {
  86. //connect
  87. $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
  88. $channel = $connection->channel();
  89.  
  90. //declare exchange and queue, bind them and consume messages
  91. $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
  92. $channel->queue_declare($requestQueue, false, false, $exclusive = false, $auto_delete = false);
  93. $channel->queue_bind($requestQueue, $exchangeName, $requestKey);
  94. $channel->basic_consume($requestQueue, "", false, $no_ack = false, false, false, $callback = $onMessage);
  95.  
  96. //start consuming
  97. while(count($channel->callbacks)) {
  98. $channel->wait();
  99. }
  100. } catch(Exception $e) {
  101. //reconnect on exception
  102. echo "Exception handled, reconnecting...nDetail:n".$e.PHP_EOL;
  103. if ($connection != null) {
  104. try {
  105. $connection->close();
  106. } catch (Exception $e1) {}
  107. }
  108. sleep(5);
  109. }
  110. }
  111.  
  112. require_once '../vendor/autoload.php';
  113. use PhpAmqpLibConnectionAMQPConnection;
  114. use PhpAmqpLibMessageAMQPMessage;
  115.  
  116. $GLOBALS["channel"] = $channel;
  117. $GLOBALS["exchangeName"] = $exchangeName;
  118.  
  119. $server = "localhost";
  120. $port = 5672;
  121. $vhost = "/";
  122. $username = "guest";
  123. $password = "guest";
  124. $exchangeName = "testEx";
  125. $requestQueue = "requestQ";
  126.  
  127. $requestKey = "requestKey";
  128.  
  129. //callback funtion on receiving request messages, reply to the reply_to header
  130. $onMessage = function ($message) {
  131. echo "In Server2 : " . $message->body . PHP_EOL;
  132.  
  133. try {
  134. $replyMessage = new AMQPMessage("Server 2 : Reply to ".$message->body, array("content_type" => "text/plain", "delivery_mode" => 1));
  135. $GLOBALS["channel"]->basic_publish($replyMessage, $GLOBALS["exchangeName"], $message->get("reply_to"));
  136. $GLOBALS["channel"]->basic_ack($message->delivery_info["delivery_tag"]);
  137. } catch (Exception $e) {
  138. $GLOBALS["channel"]->basic_nack($message->delivery_info["delivery_tag"]);
  139. }
  140. };
  141.  
  142. while (true) {
  143. try {
  144. //connect
  145. $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
  146. $channel = $connection->channel();
  147.  
  148. //declare exchange and queue, bind them and consume messages
  149. $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
  150. $channel->queue_declare($requestQueue, false, false, $exclusive = false, $auto_delete = false);
  151. $channel->queue_bind($requestQueue, $exchangeName, $requestKey);
  152. $channel->basic_consume($requestQueue, "", false, $no_ack = false, false, false, $callback = $onMessage);
  153.  
  154. //start consuming
  155. while(count($channel->callbacks)) {
  156. $channel->wait();
  157. }
  158. } catch(Exception $e) {
  159. //reconnect on exception
  160. echo "Exception handled, reconnecting...nDetail:n".$e.PHP_EOL;
  161. if ($connection != null) {
  162. try {
  163. $connection->close();
  164. } catch (Exception $e1) {}
  165. }
  166. sleep(5);
  167. }
  168. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement