Guest User

Untitled

a guest
Jan 21st, 2018
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.14 KB | None | 0 0
  1. import java.io.IOException;
  2. import java.util.Collections;
  3. import java.util.List;
  4. import java.util.UUID;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.atomic.AtomicBoolean;
  7.  
  8. import com.rabbitmq.client.AMQP.BasicProperties;
  9. import com.rabbitmq.client.Address;
  10. import com.rabbitmq.client.AlreadyClosedException;
  11. import com.rabbitmq.client.Channel;
  12. import com.rabbitmq.client.Connection;
  13. import com.rabbitmq.client.ConnectionFactory;
  14. import com.rabbitmq.client.DefaultConsumer;
  15. import com.rabbitmq.client.Envelope;
  16. import com.rabbitmq.client.Recoverable;
  17. import com.rabbitmq.client.RecoverableConnection;
  18. import com.rabbitmq.client.RecoveryListener;
  19. import com.rabbitmq.client.ShutdownListener;
  20. import com.rabbitmq.client.ShutdownSignalException;
  21. import com.rabbitmq.client.impl.ForgivingExceptionHandler;
  22.  
  23. import org.slf4j.Logger;
  24. import org.slf4j.LoggerFactory;
  25.  
  26. /**
  27. * This test is will recreate a recovery channel failure that causes the channel to be closed with reason: "reply-code=406,
  28. * reply-text=PRECONDITION_FAILED - unknown delivery tag 0, class-id=60, method-id=80)".
  29. *
  30. * When a thread tries to ack an in-flight message delivered on the old channel in the middle of recovery on the new channel an exception is raised.
  31. *
  32. * To recreate drop the Subscriber connection thru the management UI until the error is raised, the channels
  33. * error out, and messages stop being delivered. Normally it only takes 1 or 2 drops to recreate.
  34. */
  35. public class PreconditionFailedRecoveryTest {
  36.  
  37. private static final Logger logger;
  38. static {
  39. System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
  40. System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS");
  41. logger = LoggerFactory.getLogger(PreconditionFailedRecoveryTest.class); // I'm using slf4j-simple to make this go to stderr
  42. }
  43.  
  44. public static void main(final String[] args) throws Exception {
  45. final String node1 = "node1";
  46. final String node2 = "node2";
  47. final String user = "user";
  48. final String pass = "pass";
  49. final String exchangeName = "ibus.topic.loadtest1";
  50. final String queueName = "ibus-d-s-loadtest1-RecoveryTest";
  51. final int consumerCount = 10;
  52.  
  53. final ConnectionFactory cf = new ConnectionFactory();
  54. cf.setUsername(user);
  55. cf.setPassword(pass);
  56. cf.setAutomaticRecoveryEnabled(true);
  57. cf.setTopologyRecoveryEnabled(true);
  58. cf.setRequestedHeartbeat(30);
  59. cf.setExceptionHandler(new ForgivingExceptionHandler() {
  60. @Override
  61. protected void log(final String message, final Throwable e) {
  62. LoggerFactory.getLogger(ForgivingExceptionHandler.class).error("{} (Exception message: {)}", message, e.getMessage(), e);
  63. }
  64. });
  65. cf.setNetworkRecoveryInterval(5000L);
  66.  
  67. final Connection conn = createConnection(cf, Collections.singletonList(new Address(node1, 5672)), "RecoveryTestSub", consumerCount);
  68. for (int i = 0; i < consumerCount; i++) {
  69. final Channel channel = conn.createChannel();
  70. channel.basicQos(5);
  71. if (i == 0) {
  72. channel.queueDeclare(queueName, true, false, false, null);
  73. channel.queueBind(queueName, exchangeName, "test.routing.key.1");
  74. }
  75. channel.basicConsume(queueName, new DefaultConsumer(channel) {
  76. @Override
  77. public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
  78. //handle message... sleep to simulate processing of message
  79. final long sleepTime = (long) (Math.random() * 10000L);
  80. logger.info("Got message with deliveryTag={}. Process time sleep={}", envelope.getDeliveryTag(), sleepTime);
  81. try {
  82. Thread.sleep(sleepTime);
  83. } catch (InterruptedException e) {
  84. }
  85. try {
  86. channel.basicAck(envelope.getDeliveryTag(), false);
  87. } catch (final AlreadyClosedException e) {
  88. logger.error("Error acking message for deliveryTag={}. {}", envelope.getDeliveryTag(), e.toString());
  89. } catch (final Exception e) {
  90. logger.error("Error acking message for deliveryTag={}", envelope.getDeliveryTag(), e);
  91. }
  92. logger.debug("finished ack");
  93. };
  94. });
  95. }
  96.  
  97. final AtomicBoolean keepRunning = new AtomicBoolean(true);
  98. Thread pubThread = new Thread(new Runnable() {
  99. @Override
  100. public void run() {
  101. Connection conn2 = null;
  102. try {
  103. conn2 = createConnection(cf, Collections.singletonList(new Address(node2, 5672)), "RecoveryTestPub", 1);
  104. final Channel channel = conn2.createChannel();
  105. while (keepRunning.get()) {
  106. try {
  107. final BasicProperties.Builder props = new BasicProperties.Builder();
  108. props.messageId(UUID.randomUUID().toString());
  109. props.contentType("binary/RawBytes");
  110. props.headers(Collections.<String, Object> singletonMap("testKey", "testValue"));
  111. props.priority(4);
  112. channel.basicPublish(exchangeName, "test.routing.key.1", props.build(), new byte[100]);
  113. Thread.sleep(100);
  114. } catch (final Exception e) {
  115. if (conn.isOpen())
  116. logger.error("Error publishing", e);
  117. }
  118. }
  119. } catch (final Exception e) {
  120. logger.error("Error publishing", e);
  121. } finally {
  122. if (conn2 != null)
  123. try {
  124. conn2.close(5000);
  125. } catch (IOException e) {
  126. logger.error("Error publishing", e);
  127. }
  128. }
  129.  
  130. }
  131. }, "PubThread");
  132. pubThread.setDaemon(true);
  133. pubThread.start();
  134.  
  135. // now just let this run while perform your node shutdown
  136. logger.info("Queues created. Hit any key to quit...");
  137. System.in.read();
  138. logger.info("Shutting down. Disconnecting connections...");
  139.  
  140. keepRunning.set(false);
  141. pubThread.join(10000);
  142. conn.close(10000);
  143. logger.info("Goodbye");
  144. System.exit(1);
  145. }
  146.  
  147. private static Connection createConnection(final ConnectionFactory cf, final List<Address> addresses, final String connectionName, final int consumerCount)
  148. throws Exception {
  149. // initially ensure the connections are balanced across the nodes. then during recovery open it up to all adresses
  150. final AtomicBoolean inRecovery = new AtomicBoolean();
  151. final Connection conn = cf.newConnection(Executors.newFixedThreadPool(consumerCount), addresses, connectionName);
  152. logger.info("connected to {}", conn.getAddress().getHostAddress());
  153. ((RecoverableConnection) conn).addRecoveryListener(new RecoveryListener() {
  154. @Override
  155. public void handleRecoveryStarted(final Recoverable recoverable) {
  156. inRecovery.set(true);
  157. logger.info("Begin Recovery on {}", connectionName);
  158. }
  159.  
  160. @Override
  161. public void handleRecovery(final Recoverable recoverable) {
  162. logger.info("Recovery finished on {}", connectionName);
  163. }
  164. });
  165. conn.addShutdownListener(new ShutdownListener() {
  166. @Override
  167. public void shutdownCompleted(final ShutdownSignalException cause) {
  168. if (cause.isInitiatedByApplication())
  169. return;
  170. if (cause.isHardError())
  171. logger.error("Error occurred on connection={}, reason={}", connectionName, cause.getReason());
  172. }
  173. });
  174. return conn;
  175. }
  176. }
Add Comment
Please, Sign In to add comment