Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
- if (getBrokerService().isStarted()) {
- //set properties
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
- String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
- String url = getBrokerService().getVmConnectorURI().toString();
- if (getBrokerService().getDefaultSocketURIString() != null) {
- url = getBrokerService().getDefaultSocketURIString();
- }
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
- //set the data structure
- advisoryMessage.setDataStructure(command);
- advisoryMessage.setPersistent(false);
- advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
- advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
- advisoryMessage.setTargetConsumerId(targetConsumerId);
- advisoryMessage.setDestination(topic);
- advisoryMessage.setResponseRequired(false);
- advisoryMessage.setProducerId(advisoryProducerId);
- boolean originalFlowControl = context.isProducerFlowControl();
- final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
- producerExchange.setConnectionContext(context);
- producerExchange.setMutable(true);
- producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
- try {
- context.setProducerFlowControl(false);
- next.send(producerExchange, advisoryMessage);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment