Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.UUID;
- import java.util.concurrent.Executors;
- import java.util.concurrent.atomic.AtomicBoolean;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Address;
- import com.rabbitmq.client.AlreadyClosedException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.Recoverable;
- import com.rabbitmq.client.RecoverableConnection;
- import com.rabbitmq.client.RecoveryListener;
- import com.rabbitmq.client.ShutdownListener;
- import com.rabbitmq.client.ShutdownSignalException;
- import com.rabbitmq.client.impl.ForgivingExceptionHandler;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * This test is will recreate a recovery channel failure that causes the channel to be closed with reason: "reply-code=406,
- * reply-text=PRECONDITION_FAILED - unknown delivery tag 0, class-id=60, method-id=80)".
- *
- * When a thread tries to ack an in-flight message delivered on the old channel in the middle of recovery on the new channel an exception is raised.
- *
- * To recreate drop the Subscriber connection thru the management UI until the error is raised, the channels
- * error out, and messages stop being delivered. Normally it only takes 1 or 2 drops to recreate.
- */
- public class PreconditionFailedRecoveryTest {
- private static final Logger logger;
- static {
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS");
- logger = LoggerFactory.getLogger(PreconditionFailedRecoveryTest.class); // I'm using slf4j-simple to make this go to stderr
- }
- public static void main(final String[] args) throws Exception {
- final String node1 = "node1";
- final String node2 = "node2";
- final String user = "user";
- final String pass = "pass";
- final String exchangeName = "ibus.topic.loadtest1";
- final String queueName = "ibus-d-s-loadtest1-RecoveryTest";
- final int consumerCount = 10;
- final ConnectionFactory cf = new ConnectionFactory();
- cf.setUsername(user);
- cf.setPassword(pass);
- cf.setAutomaticRecoveryEnabled(true);
- cf.setTopologyRecoveryEnabled(true);
- cf.setRequestedHeartbeat(30);
- cf.setExceptionHandler(new ForgivingExceptionHandler() {
- @Override
- protected void log(final String message, final Throwable e) {
- LoggerFactory.getLogger(ForgivingExceptionHandler.class).error("{} (Exception message: {)}", message, e.getMessage(), e);
- }
- });
- cf.setNetworkRecoveryInterval(5000L);
- final Connection conn = createConnection(cf, Collections.singletonList(new Address(node1, 5672)), "RecoveryTestSub", consumerCount);
- for (int i = 0; i < consumerCount; i++) {
- final Channel channel = conn.createChannel();
- channel.basicQos(5);
- if (i == 0) {
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, "test.routing.key.1");
- }
- channel.basicConsume(queueName, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
- //handle message... sleep to simulate processing of message
- final long sleepTime = (long) (Math.random() * 10000L);
- logger.info("Got message with deliveryTag={}. Process time sleep={}", envelope.getDeliveryTag(), sleepTime);
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
- try {
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (final AlreadyClosedException e) {
- logger.error("Error acking message for deliveryTag={}. {}", envelope.getDeliveryTag(), e.toString());
- } catch (final Exception e) {
- logger.error("Error acking message for deliveryTag={}", envelope.getDeliveryTag(), e);
- }
- logger.debug("finished ack");
- };
- });
- }
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
- Thread pubThread = new Thread(new Runnable() {
- @Override
- public void run() {
- Connection conn2 = null;
- try {
- conn2 = createConnection(cf, Collections.singletonList(new Address(node2, 5672)), "RecoveryTestPub", 1);
- final Channel channel = conn2.createChannel();
- while (keepRunning.get()) {
- try {
- final BasicProperties.Builder props = new BasicProperties.Builder();
- props.messageId(UUID.randomUUID().toString());
- props.contentType("binary/RawBytes");
- props.headers(Collections.<String, Object> singletonMap("testKey", "testValue"));
- props.priority(4);
- channel.basicPublish(exchangeName, "test.routing.key.1", props.build(), new byte[100]);
- Thread.sleep(100);
- } catch (final Exception e) {
- if (conn.isOpen())
- logger.error("Error publishing", e);
- }
- }
- } catch (final Exception e) {
- logger.error("Error publishing", e);
- } finally {
- if (conn2 != null)
- try {
- conn2.close(5000);
- } catch (IOException e) {
- logger.error("Error publishing", e);
- }
- }
- }
- }, "PubThread");
- pubThread.setDaemon(true);
- pubThread.start();
- // now just let this run while perform your node shutdown
- logger.info("Queues created. Hit any key to quit...");
- System.in.read();
- logger.info("Shutting down. Disconnecting connections...");
- keepRunning.set(false);
- pubThread.join(10000);
- conn.close(10000);
- logger.info("Goodbye");
- System.exit(1);
- }
- private static Connection createConnection(final ConnectionFactory cf, final List<Address> addresses, final String connectionName, final int consumerCount)
- throws Exception {
- // initially ensure the connections are balanced across the nodes. then during recovery open it up to all adresses
- final AtomicBoolean inRecovery = new AtomicBoolean();
- final Connection conn = cf.newConnection(Executors.newFixedThreadPool(consumerCount), addresses, connectionName);
- logger.info("connected to {}", conn.getAddress().getHostAddress());
- ((RecoverableConnection) conn).addRecoveryListener(new RecoveryListener() {
- @Override
- public void handleRecoveryStarted(final Recoverable recoverable) {
- inRecovery.set(true);
- logger.info("Begin Recovery on {}", connectionName);
- }
- @Override
- public void handleRecovery(final Recoverable recoverable) {
- logger.info("Recovery finished on {}", connectionName);
- }
- });
- conn.addShutdownListener(new ShutdownListener() {
- @Override
- public void shutdownCompleted(final ShutdownSignalException cause) {
- if (cause.isInitiatedByApplication())
- return;
- if (cause.isHardError())
- logger.error("Error occurred on connection={}, reason={}", connectionName, cause.getReason());
- }
- });
- return conn;
- }
- }
Add Comment
Please, Sign In to add comment