Advertisement
Guest User

Untitled

a guest
Nov 29th, 2018
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.14 KB | None | 0 0
  1. Route::get('/send-message', 'ServiceAController@index');
  2. Route::get('/receive-message', 'ServiceBController@index');
  3.  
  4. public function index(Request $request){
  5. //Returning status 200 and sending message if amount is in range
  6. if( (-100000000 <= $request->amount ) && ($request->amount <= 100000000 )){
  7. //Sending message to RabbitMQ
  8. $amount = $request->amount;
  9. $currency = $request->currency;
  10.  
  11. //Saving request data to variable to publish it
  12. $messageContent = json_encode([
  13. 'amount' => $amount * 100,
  14. 'currency' => $currency,
  15. ]);
  16.  
  17. //Sending broker message
  18. $host = 'secret';
  19. $port = 5672;
  20. $user = 'secret';
  21. $pass = 'secret';
  22. $vhost = 'secret';
  23. $exchange = 'balance';
  24. $queue = 'local_balance';
  25.  
  26. $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
  27. $channel = $connection->channel();
  28. /*
  29. The following code is the same both in the consumer and the producer.
  30. In this way we are sure we always have a queue to consume from and an
  31. exchange where to publish messages.
  32. */
  33. /*
  34. name: $queue
  35. passive: false
  36. durable: true // the queue will survive server restarts
  37. exclusive: false // the queue can be accessed in other channels
  38. auto_delete: false //the queue won't be deleted once the channel is closed.
  39. */
  40. $channel->queue_declare($queue, false, true, false, false);
  41. /*
  42. name: $exchange
  43. type: direct
  44. passive: false
  45. durable: true // the exchange will survive server restarts
  46. auto_delete: false //the exchange won't be deleted once the channel is closed.
  47. */
  48. $channel->exchange_declare($exchange, 'direct', false, true, false);
  49. $channel->queue_bind($queue, $exchange);
  50. $messageBody = $messageContent;
  51. $message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  52. $channel->basic_publish($message, $exchange);
  53. $channel->close();
  54. $connection->close();
  55.  
  56. //Returning json response of HTTP payload
  57. $response = json_encode([
  58. 'amount' => +number_format($amount, 2, '.', ''),
  59. 'currency' => $currency,
  60. ]);
  61. return $response;
  62. }else{
  63. //Returning status 400 if amount is not in acceptable range
  64. abort(400, 'Amount is not in acceptable range'); //Returning code 400 if condition isn't met
  65. }
  66. }
  67.  
  68. public function index(){
  69.  
  70. $host = 'secret';
  71. $port = 5672;
  72. $user = 'secret';
  73. $pass = 'secret';
  74. $vhost = 'secret';
  75. $exchange = 'balance';
  76. $queue = 'local_balance';
  77.  
  78. $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
  79. $channel = $connection->channel();
  80. /*
  81. The following code is the same both in the consumer and the producer.
  82. In this way we are sure we always have a queue to consume from and an
  83. exchange where to publish messages.
  84. */
  85. /*
  86. name: $queue
  87. passive: false
  88. durable: true // the queue will survive server restarts
  89. exclusive: false // the queue can be accessed in other channels
  90. auto_delete: false //the queue won't be deleted once the channel is closed.
  91. */
  92. $channel->queue_declare($queue, false, true, false, false);
  93. /*
  94. name: $exchange
  95. type: direct
  96. passive: false
  97. durable: true // the exchange will survive server restarts
  98. auto_delete: false //the exchange won't be deleted once the channel is closed.
  99. */
  100. $channel->exchange_declare($exchange, 'direct', false, true, false);
  101. $channel->queue_bind($queue, $exchange);
  102. /**
  103. * @param AMQPMessage $message
  104. */
  105. function process_message(AMQPMessage $message){
  106. $messageBody = json_decode($message->body);
  107. $amount = $messageBody->amount;
  108. $currency = $messageBody->currency;
  109.  
  110. file_put_contents('C:/xampp/htdocs/nsoft/data' . '.json', $message->body);
  111.  
  112. $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
  113. }
  114. /*
  115. queue: Queue from where to get the messages
  116. consumer_tag: Consumer identifier
  117. no_local: Don't receive messages published by this consumer.
  118. no_ack: Tells the server if the consumer will acknowledge the messages.
  119. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  120. nowait:
  121. callback: A PHP Callback
  122. */
  123. $consumerTag = 'local.consumer';
  124. $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
  125. /**
  126. * @param PhpAmqpLibChannelAMQPChannel $channel
  127. * @param PhpAmqpLibConnectionAbstractConnection $connection
  128. */
  129. function shutdown($channel, $connection){
  130. $channel->close();
  131. $connection->close();
  132. }
  133.  
  134. register_shutdown_function('shutdown', $channel, $connection);
  135.  
  136. while (count($channel->callbacks)) {
  137. $channel->wait();
  138. }
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement