Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class RabbitMQClient
- {
- /// <summary>
- /// Rabbit mq host server
- /// </summary>
- private string mRabbitMQHost;
- /// <summary>
- /// User of rabbit MQ
- /// </summary>
- private string mUser;
- /// <summary>
- /// Password of rabbit mq
- /// </summary>
- private string mPassword;
- /// <summary>
- /// Is SSL Required
- /// </summary>
- private bool mSSLEnabled;
- /// <summary>
- /// rabbitMQ SSL Port
- /// </summary>
- private int mSSLPort;
- /// <summary>
- /// rabbitMQ SSL Server Name
- /// </summary>
- private string mSSLServerName;
- /// <summary>
- /// rabbitMQ SSL certificate path
- /// </summary>
- private string mSSLCertPath;
- /// <summary>
- /// rabbitMQ SSL certificate passphrase
- /// </summary>
- private string mSSLCertPassphrase;
- /// <summary>
- /// File path to XSD for XML validation
- /// </summary>
- private string mXSDFilePath;
- private XmlReader mXSDXMLReader;
- /// <summary>
- /// Use ot not XSD validation against XML messages
- /// </summary>
- private bool mUseXSDValidation;
- /// <summary>
- /// Connection properties for MQ
- /// </summary>
- private ConnectionFactory factory;
- private IConnection mConnection;
- private IModel mChannel;
- /// <summary>
- /// The state of the connection to the mQ
- /// </summary>
- private ConnectionState mState;
- /// <summary>
- /// The property to define message persistency when sending to queue
- /// </summary>
- private bool mIsMessagePersistent;
- /// <summary>
- /// The property to define expiration time to message when sending to queue
- /// </summary>
- private string mMessageExpire;
- /// <summary>
- /// Changes the state of the connection
- /// </summary>
- private void SetState(ConnectionState pState)
- {
- if (mState != pState)
- {
- mState = pState;
- if (ConnectionStateChanged != null)
- ConnectionStateChanged(mState);
- }
- }
- /// <summary>
- /// Hash set of all the queues names that are listened
- /// </summary>
- private ConcurrentDictionary<string, EventingBasicConsumer> mQueuesListeners;
- /// <summary>
- /// Event which is called whenever a message is received
- /// </summary>
- public event RabbitMQMessageReceivedEventHandler MessageReceived;
- /// <summary>
- /// Event which is called whenever the connection state to MQ is changed
- /// </summary>
- public event RabbitMQConnectionChangeStateEventHandler ConnectionStateChanged;
- public delegate void RabbitMQMessageReceivedEventHandler(RabbitMQEventArgs pArgs);
- public delegate void RabbitMQConnectionChangeStateEventHandler(ConnectionState pState);
- private Timer mConnectionWatchDog;
- private const int TIMER_INTERVAL = 10000;
- private static readonly log4net.ILog log = log4net.LogManager.GetLogger("RabbitMQFileAppender");
- public RabbitMQClient()
- {
- SetState(ConnectionState.Connecting);
- readConfig();
- factory = new ConnectionFactory() { UserName = mUser, Password = mPassword };
- factory.HostName = mRabbitMQHost;
- // https://stackoverflow.com/questions/39642777/rabbitmq-c-sharp-ssl
- if (mSSLEnabled) {
- //factory.AuthMechanisms = new AuthMechanismFactory[] { new ExternalMechanismFactory() };
- // Note: This should NEVER be "localhost"
- factory.Ssl.ServerName = mSSLServerName;
- // Path to my .p12 file.
- //factory.Ssl.CertPath = mSSLCertPath;
- // Passphrase for the certificate file - set through OpenSSL
- //factory.Ssl.CertPassphrase = mSSLCertPassphrase;
- factory.Ssl.Enabled = true;
- // Make sure TLS 1.2 is supported & enabled by your operating system
- factory.Ssl.Version = System.Security.Authentication.SslProtocols.Tls12;
- // This is the default RabbitMQ secure port
- factory.Port = mSSLPort;
- factory.VirtualHost = "/";
- }
- // Connection that will recover automatically
- factory.AutomaticRecoveryEnabled = true;
- // Attempt recovery every 10 seconds
- factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
- // Topology recovery
- factory.TopologyRecoveryEnabled = true;
- // set the heartbeat timeout to 60 seconds
- factory.RequestedHeartbeat = 60;
- // Get XSD file path
- string codeBase = Assembly.GetExecutingAssembly().CodeBase;
- UriBuilder uri = new UriBuilder(codeBase);
- string path = Uri.UnescapeDataString(uri.Path);
- mXSDFilePath = Path.Combine(Path.GetDirectoryName(path), "XSD", "ALMQEntities.xsd");
- mState = ConnectionState.Disconnected;
- // Start timer evey minute to check rabbitMQ connection state
- mConnectionWatchDog = new Timer(initConnection, null, TIMER_INTERVAL, Timeout.Infinite);
- mQueuesListeners = new ConcurrentDictionary<string, EventingBasicConsumer>();
- }
- private void initConnection(Object pState)
- {
- try
- {
- lock (mRabbitMQHost)
- {
- if (mConnection == null || !mConnection.IsOpen)
- {
- if (mChannel != null)
- {
- mChannel.Dispose();
- }
- mConnection = factory.CreateConnection();
- mChannel = mConnection.CreateModel();
- SetState(ConnectionState.Connected);
- }
- }
- }
- catch (Exception ex)
- {
- log.Error("Error on connection initiation to rabbit MQ server " + mRabbitMQHost, ex);
- SetState(ConnectionState.Disconnected);
- }
- finally
- {
- mConnectionWatchDog.Change(TIMER_INTERVAL, Timeout.Infinite);
- }
- }
- /// <summary>
- /// Whether the connection to MQ is open
- /// </summary>
- public bool IsConnected()
- {
- return mConnection != null && mConnection.IsOpen;
- }
- /// <summary>
- /// Read the config of rabbit mq
- /// </summary>
- private void readConfig()
- {
- KeyValueConfigurationCollection settingsKeyValuePair = getRabbitClientConfig();
- mRabbitMQHost = settingsKeyValuePair["RabbitMQServer"].Value;
- mUser = settingsKeyValuePair["RabbitMQUser"].Value;
- mPassword = settingsKeyValuePair["RabbitMQPassword"].Value;
- mSSLEnabled = Convert.ToBoolean(settingsKeyValuePair["RabbitMQSSLEnabled"].Value);
- mSSLPort = Convert.ToInt32(settingsKeyValuePair["RabbitMQSSLPort"].Value);
- mSSLServerName = settingsKeyValuePair["RabbitMQSSLServerName"].Value;
- mSSLCertPath = settingsKeyValuePair["RabbitMQSSLCertPath"].Value;
- mSSLCertPassphrase = settingsKeyValuePair["RabbitMQSSLCertPassphrase"].Value;
- mIsMessagePersistent = Convert.ToBoolean(settingsKeyValuePair["MessagesPersistence"].Value);
- mMessageExpire = settingsKeyValuePair["MessagesExpire"].Value;
- mUseXSDValidation = Convert.ToBoolean(settingsKeyValuePair["UseXSDValidation"].Value);
- }
- public KeyValueConfigurationCollection getRabbitClientConfig()
- {
- Configuration dllConfig = ConfigurationManager.OpenExeConfiguration(typeof(RabbitMQClient).Assembly.Location);
- return dllConfig.AppSettings.Settings;
- }
- /// <summary>
- /// Sending message to a RabbitMQ Exchange
- /// </summary>
- /// <param name="pMessageToSend">Content of the message</param>
- /// <param name="pExchangeName">The exchange name configured in the Rabbit MQ Server to publish to</param>
- public void SendMessageToExchange(string pMessageToSend, string pExchangeName, string pRouteKey = "")
- {
- bool isConnected = false;
- lock (mRabbitMQHost)
- {
- if (mConnection != null)
- {
- isConnected = mConnection.IsOpen;
- }
- else return;
- }
- // Drop message, if MQ is not connected
- if (!isConnected)
- {
- return;
- }
- lock (factory)
- {
- _SendMessage(mChannel, false, pExchangeName, pRouteKey, "", pMessageToSend);
- }
- }
- /// <summary>
- /// Sending mutliple messages to a RabbitMQ Exchnage
- /// </summary>
- /// <param name="pMessagesToSend"></param>
- /// <param name="pExchangeName"></param>
- public void SendMessagesToExchange(string[] pMessagesToSend, string pExchangeName, string pRouteKey = "")
- {
- bool isConnected = false;
- lock (mRabbitMQHost)
- {
- if (mConnection != null)
- {
- isConnected = mConnection.IsOpen;
- }
- else return;
- }
- // Drop message, if MQ is not connected
- if (!isConnected)
- {
- return;
- }
- if (pMessagesToSend != null && pMessagesToSend.Length > 0) {
- // Create dedicated channel
- using (IModel _Channel = mConnection.CreateModel())
- {
- foreach (var msg in pMessagesToSend)
- {
- lock (factory)
- {
- _SendMessage(_Channel, false, pExchangeName, pRouteKey, "", msg);
- }
- }
- }
- }
- }
- /// <summary>
- /// Sending message to a RabbitMQ Queue
- /// </summary>
- /// <param name="pMessageToSend"></param>
- /// <param name="pQueueName"></param>
- public void SendMessageToQueue(string pMessageToSend, string pQueueName)
- {
- bool isConnected = false;
- lock (mRabbitMQHost)
- {
- if (mConnection != null)
- {
- isConnected = mConnection.IsOpen;
- }
- else return;
- }
- // Drop message, if MQ is not connected
- if (!isConnected)
- {
- return;
- }
- lock (factory)
- {
- _SendMessage(mChannel, true, "", "", pQueueName, pMessageToSend);
- }
- }
- /// <summary>
- /// Sending mutliple messages to a RabbitMQ Queue
- /// </summary>
- /// <param name="pMessagesToSend"></param>
- /// <param name="pQueueName"></param>
- public void SendMessagesToQueue(string[] pMessagesToSend, string pQueueName)
- {
- bool isConnected = false;
- lock (mRabbitMQHost)
- {
- if (mConnection != null)
- {
- isConnected = mConnection.IsOpen;
- }
- else return;
- }
- // Drop message, if MQ is not connected
- if (!isConnected)
- {
- return;
- }
- if (pMessagesToSend != null && pMessagesToSend.Length > 0)
- {
- // Create dedicated channel
- using (IModel _Channel = mConnection.CreateModel())
- {
- foreach (var msg in pMessagesToSend)
- {
- lock (factory)
- {
- _SendMessage(_Channel, true, "", "", pQueueName, msg);
- }
- }
- }
- }
- }
- /// <summary>
- /// Receive messages from a queue defined in RabbitMQ Server, will make the thread wait until a message is received
- /// </summary>
- /// <param name="pQueueName">The queue to listen on</param>
- public bool ListenOnQueue(string pQueueName)
- {
- bool isConnected = false;
- lock (mRabbitMQHost)
- {
- if (mConnection != null)
- {
- isConnected = mConnection.IsOpen;
- }
- else return false;
- }
- // Drop message, if MQ is not connected
- if (!isConnected)
- {
- return false;
- }
- // Check if there was already listener instantiated
- if (mQueuesListeners.ContainsKey(pQueueName))
- {
- return false;
- }
- lock (factory)
- {
- try
- {
- EventingBasicConsumer consumer = new EventingBasicConsumer(mChannel);
- consumer.Received += (object sender, BasicDeliverEventArgs args) => MessageReceivedEvent(pQueueName, args);
- mChannel.BasicConsume(queue: pQueueName,
- autoAck: true,
- consumer: consumer);
- mQueuesListeners.TryAdd(pQueueName, consumer);
- return true;
- }
- catch (Exception ex)
- {
- SetState(ConnectionState.Retrying);
- log.Error("Error reading message from queue " + pQueueName + " from rabbit MQ server " + mRabbitMQHost, ex);
- return false;
- }
- }
- }
- /// <summary>
- /// Unlisten to specific queue
- /// </summary>
- /// <param name="pQueueName"></param>
- /// <returns></returns>
- public bool UnListenOnQueue(string pQueueName)
- {
- lock (factory)
- {
- try
- {
- EventingBasicConsumer consumer;
- if (mQueuesListeners.TryGetValue(pQueueName, out consumer))
- {
- mChannel.BasicCancel(consumer.ConsumerTag);
- mQueuesListeners.TryRemove(pQueueName, out consumer);
- return true;
- }
- else {
- return false;
- }
- }
- catch (Exception ex)
- {
- SetState(ConnectionState.Retrying);
- log.Error("Error reading message from queue " + pQueueName + " from rabbit MQ server " + mRabbitMQHost, ex);
- return false;
- }
- }
- }
- public int GetQueueMessages(string pQueueName)
- {
- int msgNum = -1;
- try
- {
- msgNum = (int)mChannel.MessageCount(pQueueName);
- }
- catch (Exception ex) {
- log.Error("Error getting number of messages from queue: " + pQueueName, ex);
- }
- return msgNum;
- }
- /// <summary>
- /// Called whenever any listened queue receives a message
- /// </summary>
- /// <param name="pQueueName">The queue which recevied a message</param>
- /// <param name="pArgs">Rabbit mq arguments</param>
- private void MessageReceivedEvent(string pQueueName, BasicDeliverEventArgs pArgs)
- {
- if (MessageReceived != null)
- {
- RabbitMQEventArgs args = new RabbitMQEventArgs();
- args.Body = pArgs.Body;
- args.Queue = pQueueName;
- MessageReceived(args);
- }
- }
- /// <summary>
- ///
- /// </summary>
- /// <param name="pIsQueue"></param>
- /// <param name="pExchangeName"></param>
- /// <param name="pRouteName"></param>
- /// <param name="pQueueName"></param>
- /// <param name="pMsg"></param>
- private void _SendMessage(IModel pMQChannel, bool pIsQueue, string pExchangeName, string pRouteName, string pQueueName, string pMsg)
- {
- try
- {
- if (!mUseXSDValidation || ValidateXML(pMsg))
- {
- byte[] messageBytes = Encoding.UTF8.GetBytes(pMsg);
- // Convert the message into bytes
- IBasicProperties props = pMQChannel.CreateBasicProperties();
- props.ContentType = "text/plain";
- props.DeliveryMode = 2; // Persistent message
- props.Expiration = mMessageExpire;
- props.Persistent = mIsMessagePersistent;
- pMQChannel.BasicPublish(exchange: pIsQueue ? "" : pExchangeName,
- routingKey: pIsQueue ? pQueueName : pRouteName,
- basicProperties: props,
- body: messageBytes);
- }
- else
- {
- log.Error("Original XML message:");
- log.Error("---------------------");
- log.Error(pMsg);
- }
- }
- catch (Exception ex)
- {
- SetState(ConnectionState.Retrying);
- log.Error("Error sending message to exchange " + pExchangeName + " in rabbit MQ server " + mRabbitMQHost, ex);
- }
- }
- private bool ValidateXML(string pXMLContent)
- {
- bool result = true;
- XDocument xdoc = XDocument.Parse(pXMLContent, LoadOptions.None);
- if (File.Exists(mXSDFilePath))
- {
- mXSDXMLReader = XmlReader.Create(mXSDFilePath);
- }
- XmlSchemaSet schemas = new XmlSchemaSet();
- schemas.Add("", mXSDXMLReader);
- xdoc.Validate(schemas, (o, e) =>
- {
- log.Error("Error validating xml message", new Exception(e.Message));
- result = false;
- });
- return result;
- }
- public void Dispose()
- {
- if (mConnectionWatchDog != null) {
- mConnectionWatchDog.Dispose();
- }
- if(mConnection != null && mConnection.IsOpen)
- {
- try
- {
- mConnection.Close();
- }
- catch(Exception e)
- {
- // If failed to close connection, don't mind, it's probably closed
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement