Advertisement
Guest User

Untitled

a guest
Aug 26th, 2018
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 19.46 KB | None | 0 0
  1. public class RabbitMQClient
  2. {
  3. /// <summary>
  4. /// Rabbit mq host server
  5. /// </summary>
  6. private string mRabbitMQHost;
  7.  
  8. /// <summary>
  9. /// User of rabbit MQ
  10. /// </summary>
  11. private string mUser;
  12.  
  13. /// <summary>
  14. /// Password of rabbit mq
  15. /// </summary>
  16. private string mPassword;
  17.  
  18. /// <summary>
  19. /// Is SSL Required
  20. /// </summary>
  21. private bool mSSLEnabled;
  22.  
  23. /// <summary>
  24. /// rabbitMQ SSL Port
  25. /// </summary>
  26. private int mSSLPort;
  27.  
  28. /// <summary>
  29. /// rabbitMQ SSL Server Name
  30. /// </summary>
  31. private string mSSLServerName;
  32.  
  33. /// <summary>
  34. /// rabbitMQ SSL certificate path
  35. /// </summary>
  36. private string mSSLCertPath;
  37.  
  38. /// <summary>
  39. /// rabbitMQ SSL certificate passphrase
  40. /// </summary>
  41. private string mSSLCertPassphrase;
  42.  
  43. /// <summary>
  44. /// File path to XSD for XML validation
  45. /// </summary>
  46. private string mXSDFilePath;
  47. private XmlReader mXSDXMLReader;
  48.  
  49. /// <summary>
  50. /// Use ot not XSD validation against XML messages
  51. /// </summary>
  52. private bool mUseXSDValidation;
  53.  
  54. /// <summary>
  55. /// Connection properties for MQ
  56. /// </summary>
  57. private ConnectionFactory factory;
  58.  
  59. private IConnection mConnection;
  60. private IModel mChannel;
  61.  
  62. /// <summary>
  63. /// The state of the connection to the mQ
  64. /// </summary>
  65. private ConnectionState mState;
  66.  
  67. /// <summary>
  68. /// The property to define message persistency when sending to queue
  69. /// </summary>
  70. private bool mIsMessagePersistent;
  71.  
  72. /// <summary>
  73. /// The property to define expiration time to message when sending to queue
  74. /// </summary>
  75. private string mMessageExpire;
  76.  
  77. /// <summary>
  78. /// Changes the state of the connection
  79. /// </summary>
  80. private void SetState(ConnectionState pState)
  81. {
  82. if (mState != pState)
  83. {
  84. mState = pState;
  85. if (ConnectionStateChanged != null)
  86. ConnectionStateChanged(mState);
  87. }
  88. }
  89.  
  90. /// <summary>
  91. /// Hash set of all the queues names that are listened
  92. /// </summary>
  93. private ConcurrentDictionary<string, EventingBasicConsumer> mQueuesListeners;
  94.  
  95. /// <summary>
  96. /// Event which is called whenever a message is received
  97. /// </summary>
  98. public event RabbitMQMessageReceivedEventHandler MessageReceived;
  99.  
  100. /// <summary>
  101. /// Event which is called whenever the connection state to MQ is changed
  102. /// </summary>
  103. public event RabbitMQConnectionChangeStateEventHandler ConnectionStateChanged;
  104.  
  105. public delegate void RabbitMQMessageReceivedEventHandler(RabbitMQEventArgs pArgs);
  106. public delegate void RabbitMQConnectionChangeStateEventHandler(ConnectionState pState);
  107.  
  108. private Timer mConnectionWatchDog;
  109. private const int TIMER_INTERVAL = 10000;
  110.  
  111. private static readonly log4net.ILog log = log4net.LogManager.GetLogger("RabbitMQFileAppender");
  112.  
  113. public RabbitMQClient()
  114. {
  115. SetState(ConnectionState.Connecting);
  116. readConfig();
  117. factory = new ConnectionFactory() { UserName = mUser, Password = mPassword };
  118. factory.HostName = mRabbitMQHost;
  119.  
  120. // https://stackoverflow.com/questions/39642777/rabbitmq-c-sharp-ssl
  121. if (mSSLEnabled) {
  122.  
  123. //factory.AuthMechanisms = new AuthMechanismFactory[] { new ExternalMechanismFactory() };
  124. // Note: This should NEVER be "localhost"
  125. factory.Ssl.ServerName = mSSLServerName;
  126. // Path to my .p12 file.
  127. //factory.Ssl.CertPath = mSSLCertPath;
  128. // Passphrase for the certificate file - set through OpenSSL
  129. //factory.Ssl.CertPassphrase = mSSLCertPassphrase;
  130. factory.Ssl.Enabled = true;
  131. // Make sure TLS 1.2 is supported & enabled by your operating system
  132. factory.Ssl.Version = System.Security.Authentication.SslProtocols.Tls12;
  133. // This is the default RabbitMQ secure port
  134. factory.Port = mSSLPort;
  135. factory.VirtualHost = "/";
  136. }
  137.  
  138. // Connection that will recover automatically
  139. factory.AutomaticRecoveryEnabled = true;
  140. // Attempt recovery every 10 seconds
  141. factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
  142. // Topology recovery
  143. factory.TopologyRecoveryEnabled = true;
  144. // set the heartbeat timeout to 60 seconds
  145. factory.RequestedHeartbeat = 60;
  146.  
  147. // Get XSD file path
  148. string codeBase = Assembly.GetExecutingAssembly().CodeBase;
  149. UriBuilder uri = new UriBuilder(codeBase);
  150. string path = Uri.UnescapeDataString(uri.Path);
  151. mXSDFilePath = Path.Combine(Path.GetDirectoryName(path), "XSD", "ALMQEntities.xsd");
  152.  
  153. mState = ConnectionState.Disconnected;
  154.  
  155. // Start timer evey minute to check rabbitMQ connection state
  156. mConnectionWatchDog = new Timer(initConnection, null, TIMER_INTERVAL, Timeout.Infinite);
  157.  
  158. mQueuesListeners = new ConcurrentDictionary<string, EventingBasicConsumer>();
  159. }
  160.  
  161. private void initConnection(Object pState)
  162. {
  163. try
  164. {
  165. lock (mRabbitMQHost)
  166. {
  167. if (mConnection == null || !mConnection.IsOpen)
  168. {
  169. if (mChannel != null)
  170. {
  171. mChannel.Dispose();
  172. }
  173.  
  174. mConnection = factory.CreateConnection();
  175. mChannel = mConnection.CreateModel();
  176. SetState(ConnectionState.Connected);
  177. }
  178. }
  179. }
  180. catch (Exception ex)
  181. {
  182. log.Error("Error on connection initiation to rabbit MQ server " + mRabbitMQHost, ex);
  183. SetState(ConnectionState.Disconnected);
  184. }
  185. finally
  186. {
  187. mConnectionWatchDog.Change(TIMER_INTERVAL, Timeout.Infinite);
  188. }
  189. }
  190.  
  191.  
  192. /// <summary>
  193. /// Whether the connection to MQ is open
  194. /// </summary>
  195. public bool IsConnected()
  196. {
  197. return mConnection != null && mConnection.IsOpen;
  198. }
  199.  
  200. /// <summary>
  201. /// Read the config of rabbit mq
  202. /// </summary>
  203. private void readConfig()
  204. {
  205. KeyValueConfigurationCollection settingsKeyValuePair = getRabbitClientConfig();
  206.  
  207. mRabbitMQHost = settingsKeyValuePair["RabbitMQServer"].Value;
  208. mUser = settingsKeyValuePair["RabbitMQUser"].Value;
  209. mPassword = settingsKeyValuePair["RabbitMQPassword"].Value;
  210.  
  211. mSSLEnabled = Convert.ToBoolean(settingsKeyValuePair["RabbitMQSSLEnabled"].Value);
  212. mSSLPort = Convert.ToInt32(settingsKeyValuePair["RabbitMQSSLPort"].Value);
  213. mSSLServerName = settingsKeyValuePair["RabbitMQSSLServerName"].Value;
  214. mSSLCertPath = settingsKeyValuePair["RabbitMQSSLCertPath"].Value;
  215. mSSLCertPassphrase = settingsKeyValuePair["RabbitMQSSLCertPassphrase"].Value;
  216.  
  217. mIsMessagePersistent = Convert.ToBoolean(settingsKeyValuePair["MessagesPersistence"].Value);
  218. mMessageExpire = settingsKeyValuePair["MessagesExpire"].Value;
  219.  
  220. mUseXSDValidation = Convert.ToBoolean(settingsKeyValuePair["UseXSDValidation"].Value);
  221. }
  222.  
  223. public KeyValueConfigurationCollection getRabbitClientConfig()
  224. {
  225. Configuration dllConfig = ConfigurationManager.OpenExeConfiguration(typeof(RabbitMQClient).Assembly.Location);
  226.  
  227. return dllConfig.AppSettings.Settings;
  228. }
  229.  
  230. /// <summary>
  231. /// Sending message to a RabbitMQ Exchange
  232. /// </summary>
  233. /// <param name="pMessageToSend">Content of the message</param>
  234. /// <param name="pExchangeName">The exchange name configured in the Rabbit MQ Server to publish to</param>
  235. public void SendMessageToExchange(string pMessageToSend, string pExchangeName, string pRouteKey = "")
  236. {
  237. bool isConnected = false;
  238. lock (mRabbitMQHost)
  239. {
  240. if (mConnection != null)
  241. {
  242. isConnected = mConnection.IsOpen;
  243. }
  244. else return;
  245. }
  246.  
  247. // Drop message, if MQ is not connected
  248. if (!isConnected)
  249. {
  250. return;
  251. }
  252.  
  253. lock (factory)
  254. {
  255. _SendMessage(mChannel, false, pExchangeName, pRouteKey, "", pMessageToSend);
  256. }
  257. }
  258.  
  259. /// <summary>
  260. /// Sending mutliple messages to a RabbitMQ Exchnage
  261. /// </summary>
  262. /// <param name="pMessagesToSend"></param>
  263. /// <param name="pExchangeName"></param>
  264. public void SendMessagesToExchange(string[] pMessagesToSend, string pExchangeName, string pRouteKey = "")
  265. {
  266. bool isConnected = false;
  267. lock (mRabbitMQHost)
  268. {
  269. if (mConnection != null)
  270. {
  271. isConnected = mConnection.IsOpen;
  272. }
  273. else return;
  274. }
  275.  
  276. // Drop message, if MQ is not connected
  277. if (!isConnected)
  278. {
  279. return;
  280. }
  281.  
  282. if (pMessagesToSend != null && pMessagesToSend.Length > 0) {
  283.  
  284. // Create dedicated channel
  285. using (IModel _Channel = mConnection.CreateModel())
  286. {
  287. foreach (var msg in pMessagesToSend)
  288. {
  289. lock (factory)
  290. {
  291. _SendMessage(_Channel, false, pExchangeName, pRouteKey, "", msg);
  292. }
  293. }
  294. }
  295. }
  296. }
  297.  
  298. /// <summary>
  299. /// Sending message to a RabbitMQ Queue
  300. /// </summary>
  301. /// <param name="pMessageToSend"></param>
  302. /// <param name="pQueueName"></param>
  303. public void SendMessageToQueue(string pMessageToSend, string pQueueName)
  304. {
  305. bool isConnected = false;
  306. lock (mRabbitMQHost)
  307. {
  308. if (mConnection != null)
  309. {
  310. isConnected = mConnection.IsOpen;
  311. }
  312. else return;
  313. }
  314.  
  315. // Drop message, if MQ is not connected
  316. if (!isConnected)
  317. {
  318. return;
  319. }
  320.  
  321. lock (factory)
  322. {
  323. _SendMessage(mChannel, true, "", "", pQueueName, pMessageToSend);
  324. }
  325. }
  326.  
  327. /// <summary>
  328. /// Sending mutliple messages to a RabbitMQ Queue
  329. /// </summary>
  330. /// <param name="pMessagesToSend"></param>
  331. /// <param name="pQueueName"></param>
  332. public void SendMessagesToQueue(string[] pMessagesToSend, string pQueueName)
  333. {
  334. bool isConnected = false;
  335. lock (mRabbitMQHost)
  336. {
  337. if (mConnection != null)
  338. {
  339. isConnected = mConnection.IsOpen;
  340. }
  341. else return;
  342. }
  343.  
  344. // Drop message, if MQ is not connected
  345. if (!isConnected)
  346. {
  347. return;
  348. }
  349.  
  350. if (pMessagesToSend != null && pMessagesToSend.Length > 0)
  351. {
  352.  
  353. // Create dedicated channel
  354. using (IModel _Channel = mConnection.CreateModel())
  355. {
  356. foreach (var msg in pMessagesToSend)
  357. {
  358. lock (factory)
  359. {
  360. _SendMessage(_Channel, true, "", "", pQueueName, msg);
  361. }
  362. }
  363. }
  364. }
  365. }
  366.  
  367. /// <summary>
  368. /// Receive messages from a queue defined in RabbitMQ Server, will make the thread wait until a message is received
  369. /// </summary>
  370. /// <param name="pQueueName">The queue to listen on</param>
  371. public bool ListenOnQueue(string pQueueName)
  372. {
  373. bool isConnected = false;
  374. lock (mRabbitMQHost)
  375. {
  376. if (mConnection != null)
  377. {
  378. isConnected = mConnection.IsOpen;
  379. }
  380. else return false;
  381. }
  382.  
  383. // Drop message, if MQ is not connected
  384. if (!isConnected)
  385. {
  386. return false;
  387. }
  388.  
  389. // Check if there was already listener instantiated
  390. if (mQueuesListeners.ContainsKey(pQueueName))
  391. {
  392. return false;
  393. }
  394.  
  395. lock (factory)
  396. {
  397. try
  398. {
  399. EventingBasicConsumer consumer = new EventingBasicConsumer(mChannel);
  400. consumer.Received += (object sender, BasicDeliverEventArgs args) => MessageReceivedEvent(pQueueName, args);
  401. mChannel.BasicConsume(queue: pQueueName,
  402. autoAck: true,
  403. consumer: consumer);
  404.  
  405. mQueuesListeners.TryAdd(pQueueName, consumer);
  406.  
  407. return true;
  408. }
  409. catch (Exception ex)
  410. {
  411. SetState(ConnectionState.Retrying);
  412. log.Error("Error reading message from queue " + pQueueName + " from rabbit MQ server " + mRabbitMQHost, ex);
  413. return false;
  414. }
  415. }
  416. }
  417.  
  418. /// <summary>
  419. /// Unlisten to specific queue
  420. /// </summary>
  421. /// <param name="pQueueName"></param>
  422. /// <returns></returns>
  423. public bool UnListenOnQueue(string pQueueName)
  424. {
  425. lock (factory)
  426. {
  427. try
  428. {
  429. EventingBasicConsumer consumer;
  430. if (mQueuesListeners.TryGetValue(pQueueName, out consumer))
  431. {
  432. mChannel.BasicCancel(consumer.ConsumerTag);
  433.  
  434. mQueuesListeners.TryRemove(pQueueName, out consumer);
  435.  
  436. return true;
  437. }
  438. else {
  439. return false;
  440. }
  441. }
  442. catch (Exception ex)
  443. {
  444. SetState(ConnectionState.Retrying);
  445. log.Error("Error reading message from queue " + pQueueName + " from rabbit MQ server " + mRabbitMQHost, ex);
  446. return false;
  447. }
  448. }
  449. }
  450.  
  451. public int GetQueueMessages(string pQueueName)
  452. {
  453. int msgNum = -1;
  454.  
  455. try
  456. {
  457. msgNum = (int)mChannel.MessageCount(pQueueName);
  458. }
  459. catch (Exception ex) {
  460. log.Error("Error getting number of messages from queue: " + pQueueName, ex);
  461. }
  462.  
  463. return msgNum;
  464. }
  465.  
  466. /// <summary>
  467. /// Called whenever any listened queue receives a message
  468. /// </summary>
  469. /// <param name="pQueueName">The queue which recevied a message</param>
  470. /// <param name="pArgs">Rabbit mq arguments</param>
  471. private void MessageReceivedEvent(string pQueueName, BasicDeliverEventArgs pArgs)
  472. {
  473. if (MessageReceived != null)
  474. {
  475. RabbitMQEventArgs args = new RabbitMQEventArgs();
  476. args.Body = pArgs.Body;
  477. args.Queue = pQueueName;
  478.  
  479. MessageReceived(args);
  480. }
  481. }
  482.  
  483. /// <summary>
  484. ///
  485. /// </summary>
  486. /// <param name="pIsQueue"></param>
  487. /// <param name="pExchangeName"></param>
  488. /// <param name="pRouteName"></param>
  489. /// <param name="pQueueName"></param>
  490. /// <param name="pMsg"></param>
  491. private void _SendMessage(IModel pMQChannel, bool pIsQueue, string pExchangeName, string pRouteName, string pQueueName, string pMsg)
  492. {
  493. try
  494. {
  495. if (!mUseXSDValidation || ValidateXML(pMsg))
  496. {
  497. byte[] messageBytes = Encoding.UTF8.GetBytes(pMsg);
  498.  
  499. // Convert the message into bytes
  500. IBasicProperties props = pMQChannel.CreateBasicProperties();
  501. props.ContentType = "text/plain";
  502. props.DeliveryMode = 2; // Persistent message
  503. props.Expiration = mMessageExpire;
  504. props.Persistent = mIsMessagePersistent;
  505.  
  506. pMQChannel.BasicPublish(exchange: pIsQueue ? "" : pExchangeName,
  507. routingKey: pIsQueue ? pQueueName : pRouteName,
  508. basicProperties: props,
  509. body: messageBytes);
  510. }
  511. else
  512. {
  513. log.Error("Original XML message:");
  514. log.Error("---------------------");
  515. log.Error(pMsg);
  516. }
  517.  
  518. }
  519. catch (Exception ex)
  520. {
  521. SetState(ConnectionState.Retrying);
  522. log.Error("Error sending message to exchange " + pExchangeName + " in rabbit MQ server " + mRabbitMQHost, ex);
  523. }
  524. }
  525.  
  526. private bool ValidateXML(string pXMLContent)
  527. {
  528. bool result = true;
  529.  
  530. XDocument xdoc = XDocument.Parse(pXMLContent, LoadOptions.None);
  531.  
  532. if (File.Exists(mXSDFilePath))
  533. {
  534. mXSDXMLReader = XmlReader.Create(mXSDFilePath);
  535. }
  536.  
  537. XmlSchemaSet schemas = new XmlSchemaSet();
  538. schemas.Add("", mXSDXMLReader);
  539.  
  540. xdoc.Validate(schemas, (o, e) =>
  541. {
  542. log.Error("Error validating xml message", new Exception(e.Message));
  543. result = false;
  544. });
  545.  
  546. return result;
  547. }
  548.  
  549. public void Dispose()
  550. {
  551. if (mConnectionWatchDog != null) {
  552. mConnectionWatchDog.Dispose();
  553. }
  554.  
  555. if(mConnection != null && mConnection.IsOpen)
  556. {
  557. try
  558. {
  559. mConnection.Close();
  560. }
  561. catch(Exception e)
  562. {
  563. // If failed to close connection, don't mind, it's probably closed
  564. }
  565. }
  566. }
  567. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement