Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Route::get('/send-message', 'ServiceAController@index');
- Route::get('/receive-message', 'ServiceBController@index');
- public function index(Request $request){
- //Returning status 200 and sending message if amount is in range
- if( (-100000000 <= $request->amount ) && ($request->amount <= 100000000 )){
- //Sending message to RabbitMQ
- $amount = $request->amount;
- $currency = $request->currency;
- //Saving request data to variable to publish it
- $messageContent = json_encode([
- 'amount' => $amount * 100,
- 'currency' => $currency,
- ]);
- //Sending broker message
- $host = 'secret';
- $port = 5672;
- $user = 'secret';
- $pass = 'secret';
- $vhost = 'secret';
- $exchange = 'balance';
- $queue = 'local_balance';
- $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
- $channel = $connection->channel();
- /*
- The following code is the same both in the consumer and the producer.
- In this way we are sure we always have a queue to consume from and an
- exchange where to publish messages.
- */
- /*
- name: $queue
- passive: false
- durable: true // the queue will survive server restarts
- exclusive: false // the queue can be accessed in other channels
- auto_delete: false //the queue won't be deleted once the channel is closed.
- */
- $channel->queue_declare($queue, false, true, false, false);
- /*
- name: $exchange
- type: direct
- passive: false
- durable: true // the exchange will survive server restarts
- auto_delete: false //the exchange won't be deleted once the channel is closed.
- */
- $channel->exchange_declare($exchange, 'direct', false, true, false);
- $channel->queue_bind($queue, $exchange);
- $messageBody = $messageContent;
- $message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
- $channel->basic_publish($message, $exchange);
- $channel->close();
- $connection->close();
- //Returning json response of HTTP payload
- $response = json_encode([
- 'amount' => +number_format($amount, 2, '.', ''),
- 'currency' => $currency,
- ]);
- return $response;
- }else{
- //Returning status 400 if amount is not in acceptable range
- abort(400, 'Amount is not in acceptable range'); //Returning code 400 if condition isn't met
- }
- }
- public function index(){
- $host = 'secret';
- $port = 5672;
- $user = 'secret';
- $pass = 'secret';
- $vhost = 'secret';
- $exchange = 'balance';
- $queue = 'local_balance';
- $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
- $channel = $connection->channel();
- /*
- The following code is the same both in the consumer and the producer.
- In this way we are sure we always have a queue to consume from and an
- exchange where to publish messages.
- */
- /*
- name: $queue
- passive: false
- durable: true // the queue will survive server restarts
- exclusive: false // the queue can be accessed in other channels
- auto_delete: false //the queue won't be deleted once the channel is closed.
- */
- $channel->queue_declare($queue, false, true, false, false);
- /*
- name: $exchange
- type: direct
- passive: false
- durable: true // the exchange will survive server restarts
- auto_delete: false //the exchange won't be deleted once the channel is closed.
- */
- $channel->exchange_declare($exchange, 'direct', false, true, false);
- $channel->queue_bind($queue, $exchange);
- /**
- * @param AMQPMessage $message
- */
- function process_message(AMQPMessage $message){
- $messageBody = json_decode($message->body);
- $amount = $messageBody->amount;
- $currency = $messageBody->currency;
- file_put_contents('C:/xampp/htdocs/nsoft/data' . '.json', $message->body);
- $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
- }
- /*
- queue: Queue from where to get the messages
- consumer_tag: Consumer identifier
- no_local: Don't receive messages published by this consumer.
- no_ack: Tells the server if the consumer will acknowledge the messages.
- exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
- nowait:
- callback: A PHP Callback
- */
- $consumerTag = 'local.consumer';
- $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
- /**
- * @param PhpAmqpLibChannelAMQPChannel $channel
- * @param PhpAmqpLibConnectionAbstractConnection $connection
- */
- function shutdown($channel, $connection){
- $channel->close();
- $connection->close();
- }
- register_shutdown_function('shutdown', $channel, $connection);
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement