Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require_once '../vendor/autoload.php';
- use PhpAmqpLibConnectionAMQPConnection;
- use PhpAmqpLibMessageAMQPMessage;
- $GLOBALS["channel"] = $channel;
- $GLOBALS["consumerTag"] = $consumerTag;
- $server = "localhost";
- $port = 5672;
- $vhost = "/";
- $username = "guest";
- $password = "guest";
- $exchangeName = "testEx";
- $replyQueue = "replyQ";
- $replyKey = "reply";
- $requestKey = "requestKey";
- //callback funtion on receiving reply messages
- $onMessage = function ($message) {
- echo "In Client : " . $message->body.PHP_EOL;
- //stop consuming once receives the reply
- $GLOBALS["channel"]->basic_cancel($GLOBALS["consumerTag"]);
- };
- try {
- //connect
- $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
- $channel = $connection->channel();
- // Declare exchange and queue
- $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
- //listen for reply messages
- $channel->queue_declare($replyQueue, false, false, $exclusive = false, $auto_delete = false);
- $channel->queue_bind($replyQueue, $exchangeName, $replyKey);
- $consumerTag = $channel->basic_consume($replyQueue, "", false, $no_ack = true, false, false, $callback = $onMessage);
- //send request message
- $message = new AMQPMessage("Bilal is the message !", array("content_type" => "text/plain", "delivery_mode" => 1, "reply_to" => $replyKey));
- $channel->basic_publish($message, $exchangeName, $requestKey);
- //start consuming
- while(count($channel->callbacks)) {
- $channel->wait();
- }
- //disconnect
- $connection->close();
- } catch(Exception $e) {
- echo $e.PHP_EOL;
- }
- require_once '../vendor/autoload.php';
- use PhpAmqpLibConnectionAMQPConnection;
- use PhpAmqpLibMessageAMQPMessage;
- $GLOBALS["channel"] = $channel;
- $GLOBALS["exchangeName"] = $exchangeName;
- $server = "localhost";
- $port = 5672;
- $vhost = "/";
- $username = "guest";
- $password = "guest";
- $exchangeName = "testEx";
- $requestQueue = "requestQ";
- $requestKey = "request";
- //callback funtion on receiving request messages, reply to the reply_to header
- $onMessage = function ($message) {
- echo "In Server1 : " . $message->body . PHP_EOL;
- try {
- $replyMessage = new AMQPMessage("Server 1 : Reply to ".$message->body, array("content_type" => "text/plain", "delivery_mode" => 1));
- $GLOBALS["channel"]->basic_publish($replyMessage, $GLOBALS["exchangeName"], $message->get("reply_to"));
- $GLOBALS["channel"]->basic_ack($message->delivery_info["delivery_tag"]);
- } catch (Exception $e) {
- $GLOBALS["channel"]->basic_nack($message->delivery_info["delivery_tag"]);
- }
- };
- while (true) {
- try {
- //connect
- $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
- $channel = $connection->channel();
- //declare exchange and queue, bind them and consume messages
- $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
- $channel->queue_declare($requestQueue, false, false, $exclusive = false, $auto_delete = false);
- $channel->queue_bind($requestQueue, $exchangeName, $requestKey);
- $channel->basic_consume($requestQueue, "", false, $no_ack = false, false, false, $callback = $onMessage);
- //start consuming
- while(count($channel->callbacks)) {
- $channel->wait();
- }
- } catch(Exception $e) {
- //reconnect on exception
- echo "Exception handled, reconnecting...nDetail:n".$e.PHP_EOL;
- if ($connection != null) {
- try {
- $connection->close();
- } catch (Exception $e1) {}
- }
- sleep(5);
- }
- }
- require_once '../vendor/autoload.php';
- use PhpAmqpLibConnectionAMQPConnection;
- use PhpAmqpLibMessageAMQPMessage;
- $GLOBALS["channel"] = $channel;
- $GLOBALS["exchangeName"] = $exchangeName;
- $server = "localhost";
- $port = 5672;
- $vhost = "/";
- $username = "guest";
- $password = "guest";
- $exchangeName = "testEx";
- $requestQueue = "requestQ";
- $requestKey = "requestKey";
- //callback funtion on receiving request messages, reply to the reply_to header
- $onMessage = function ($message) {
- echo "In Server2 : " . $message->body . PHP_EOL;
- try {
- $replyMessage = new AMQPMessage("Server 2 : Reply to ".$message->body, array("content_type" => "text/plain", "delivery_mode" => 1));
- $GLOBALS["channel"]->basic_publish($replyMessage, $GLOBALS["exchangeName"], $message->get("reply_to"));
- $GLOBALS["channel"]->basic_ack($message->delivery_info["delivery_tag"]);
- } catch (Exception $e) {
- $GLOBALS["channel"]->basic_nack($message->delivery_info["delivery_tag"]);
- }
- };
- while (true) {
- try {
- //connect
- $connection = new AMQPConnection($server, $port, $username, $password, $vhost, $heartbeat = 60);
- $channel = $connection->channel();
- //declare exchange and queue, bind them and consume messages
- $channel->exchange_declare($exchangeName, $type = "direct", false, false, $auto_delete = false);
- $channel->queue_declare($requestQueue, false, false, $exclusive = false, $auto_delete = false);
- $channel->queue_bind($requestQueue, $exchangeName, $requestKey);
- $channel->basic_consume($requestQueue, "", false, $no_ack = false, false, false, $callback = $onMessage);
- //start consuming
- while(count($channel->callbacks)) {
- $channel->wait();
- }
- } catch(Exception $e) {
- //reconnect on exception
- echo "Exception handled, reconnecting...nDetail:n".$e.PHP_EOL;
- if ($connection != null) {
- try {
- $connection->close();
- } catch (Exception $e1) {}
- }
- sleep(5);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement