Advertisement
Guest User

Untitled

a guest
Feb 27th, 2020
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.31 KB | None | 0 0
  1. package com.onewin;
  2.  
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.QueueingConsumer;
  7. import com.rabbitmq.client.AMQP.BasicProperties;
  8.  
  9. public class RPCServer {
  10.  
  11. private static final String RPC_QUEUE_NAME = "rpc_queue";
  12.  
  13. private static int fib(int n) {
  14. if (n ==0) return 0;
  15. if (n == 1) return 1;
  16. return fib(n-1) + fib(n-2);
  17. }
  18.  
  19. public static void main(String[] argv) {
  20. Connection connection = null;
  21. Channel channel = null;
  22. try {
  23. ConnectionFactory factory = new ConnectionFactory();
  24.  
  25. // default is localhost, with guest/guest credentials
  26.  
  27. connection = factory.newConnection();
  28. channel = connection.createChannel();
  29.  
  30. // this queue declaration also creates an implicit binding to the default exchange with the queue name as binding key
  31. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  32.  
  33. // This tells RabbitMQ not to give more than one message to the server at a time
  34. // In other words, don't dispatch a new message to the server until it has processed and acknowledged the previous one.
  35. channel.basicQos(1);
  36.  
  37. QueueingConsumer consumer = new QueueingConsumer(channel);
  38.  
  39. // using the explicit acknowledgement model
  40. channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
  41.  
  42. System.out.println(" [x] Awaiting RPC requests");
  43.  
  44. while (true) {
  45. String response = null;
  46.  
  47. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  48.  
  49. BasicProperties props = delivery.getProperties();
  50. BasicProperties replyProps = new BasicProperties
  51. .Builder()
  52. .correlationId(props.getCorrelationId())
  53. .build();
  54.  
  55. try {
  56. String message = new String(delivery.getBody(),"UTF-8");
  57. int n = Integer.parseInt(message);
  58.  
  59. System.out.println(" [.] fib(" + message + ")");
  60. response = "" + fib(n);
  61. }
  62. catch (Exception e){
  63. System.out.println(" [.] " + e.toString());
  64. response = "";
  65. }
  66. finally {
  67. channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
  68.  
  69. // here it is possible that the server dies without sending an acknowledgment for the request.
  70. // if that happens the restarted RPC server will process the request again.
  71. // That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
  72.  
  73. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  74. }
  75. }
  76. }
  77. catch (Exception e) {
  78. e.printStackTrace();
  79. }
  80. finally {
  81. if (connection != null) {
  82. try {
  83. connection.close();
  84. }
  85. catch (Exception ignore) {}
  86. }
  87. }
  88. }
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement