Guest User

Untitled

a guest
Aug 31st, 2018
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 22.83 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net.Security;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Threading.Tasks;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using Extend;
  11. using RabbitMQ.Client;
  12. using VRSK.UW.Common.Messaging.Extensions;
  13. using Genesis.Ensure;
  14. using RabbitMQ.Client.Events;
  15. using RabbitMQ.Client.Framing;
  16. using VRSK.UW.Common.Messaging.Models;
  17. using System.Collections.Concurrent;
  18. using System.Linq;
  19. using System.Threading;
  20. using log4net;
  21. using System.Reflection;
  22. using VRSK.UW.Infrastructure.Models;
  23. using VRSK.UW.Infrastructure.Extensions;
  24.  
  25. namespace VRSK.UW.Common.Messaging.Services.RabbitMQ
  26. {
  27. public class RabbitMqMessagingServiceObservable : IMessagingService
  28. {
  29. protected static BlockingCollection<RabbitMqConnectionContext> Connections = new BlockingCollection<RabbitMqConnectionContext>();
  30.  
  31. private readonly ILog Logger;
  32. private readonly AsyncDuplicateLock AsyncLockObject = new AsyncDuplicateLock();
  33. private readonly string LockKey = "RabbitMqMessagingService";
  34. protected readonly string Exchange;
  35. protected readonly string QueueName;
  36. protected readonly string RoutingKey;
  37. protected readonly int RetryCount;
  38. protected readonly IBasicProperties BasicProperties;
  39. protected readonly TimeSpan TimeoutExpirationInMilliseconds;
  40. protected readonly TimeSpan RetryIntervalInMilliseconds;
  41. protected readonly CompositeDisposable Disposables = new CompositeDisposable();
  42. protected readonly Encoding MessageEncoding;
  43. protected readonly IScheduler WorkScheduler;
  44. protected readonly Func<IEnumerable<AmqpTcpEndpoint>, IEndpointResolver> EndpointResolverFactoryFunc;
  45. protected virtual IReadOnlyCollection<IConnectionFactory> ConnectionFactories { get; private set; }
  46.  
  47. public RabbitMqMessagingServiceObservable(
  48. RabbitMqOptions options,
  49. ILog logger = null)
  50. {
  51. Ensure.ArgumentNotNull(options, nameof(options));
  52.  
  53. Logger = Logger ?? LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
  54.  
  55. if (options.Ssl.IsNull())
  56. {
  57. options.Ssl = new SslOption
  58. {
  59. ServerName = options.SslServerName,
  60. Enabled = true,
  61. AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
  62. SslPolicyErrors.RemoteCertificateChainErrors
  63. };
  64. }
  65.  
  66. if (options.BasicProperties.IsNull())
  67. {
  68. options.BasicProperties = new BasicProperties
  69. {
  70. Persistent = true,
  71. DeliveryMode = 2
  72. };
  73. }
  74.  
  75. Exchange = options.Exchange;
  76. RetryCount = options.RetryCount;
  77. RetryIntervalInMilliseconds = options.RetryIntervalInMilliseconds;
  78. QueueName = options.QueueName;
  79. RoutingKey = options.RoutingKey ?? options.QueueName;
  80. BasicProperties = options.BasicProperties;
  81. TimeoutExpirationInMilliseconds = options.TimeoutExpirationInMilliseconds;
  82. MessageEncoding = options.MessagEncoding;
  83. WorkScheduler = options.WorkScheduler;
  84.  
  85. ConnectionFactories = options.HostNames
  86. .Select(hostName => CreateConnectionFactoryWithOptions(hostName, options)).ToReadOnlyCollection();
  87. }
  88.  
  89. public RabbitMqMessagingServiceObservable(
  90. string[] hostNames,
  91. int port,
  92. string virtualHost,
  93. string sslServerName,
  94. string username,
  95. string password,
  96. string queueName,
  97. string routingKey = null,
  98. string exchange = "",
  99. IBasicProperties basicProperties = null,
  100. TimeSpan? timeoutExpirationInMilliseconds = null,
  101. int retryCount = 5,
  102. TimeSpan? retryIntervalInMilliseconds = null,
  103. Encoding encoding = null,
  104. IScheduler workScheduler = null,
  105. Func<IEnumerable<AmqpTcpEndpoint>, IEndpointResolver> endpointResolverFactoryFunc = null,
  106. ILog logger = null)
  107. {
  108. Ensure.ArgumentNotNull(hostNames, nameof(hostNames));
  109. Ensure.ArgumentNotNull(virtualHost, nameof(virtualHost));
  110. Ensure.ArgumentNotNull(sslServerName, nameof(sslServerName));
  111. Ensure.ArgumentNotNull(username, nameof(username));
  112. Ensure.ArgumentNotNull(password, nameof(password));
  113. Ensure.ArgumentNotNull(queueName, nameof(queueName));
  114.  
  115. Logger = Logger ?? LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
  116. Exchange = exchange;
  117. RetryCount = retryCount;
  118. RetryIntervalInMilliseconds = retryIntervalInMilliseconds ?? TimeSpan.FromMilliseconds(200);
  119. TimeoutExpirationInMilliseconds = timeoutExpirationInMilliseconds ?? TimeSpan.FromMilliseconds(300);
  120. QueueName = queueName;
  121. RoutingKey = routingKey ?? queueName;
  122. MessageEncoding = encoding ?? Encoding.UTF8;
  123. WorkScheduler = workScheduler ?? DefaultScheduler.Instance;
  124. EndpointResolverFactoryFunc = endpointResolverFactoryFunc;
  125.  
  126. BasicProperties = basicProperties ?? new BasicProperties
  127. {
  128. Persistent = true,
  129. DeliveryMode = 2
  130. };
  131.  
  132. ConnectionFactories = hostNames
  133. .Select(hostName => CreateConnectionFactoryWithDefaultOptions(hostName, port, virtualHost, sslServerName, username, password))
  134. .ToReadOnlyCollection();
  135. }
  136.  
  137. private IConnectionFactory CreateConnectionFactoryWithOptions(
  138. string hostName,
  139. RabbitMqOptions options)
  140. {
  141. var cf = new ConnectionFactory
  142. {
  143. HostName = hostName,
  144. Port = options.Port,
  145. VirtualHost = options.VirtualHost,
  146. AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled,
  147. NetworkRecoveryInterval = options.NetworkRecoveryInterval,
  148. TopologyRecoveryEnabled = options.TopologyRecoveryEnabled,
  149. Ssl = options.Ssl,
  150. RequestedConnectionTimeout = 3000,
  151. UserName = options.Username,
  152. Password = options.Password,
  153. RequestedHeartbeat = 60,
  154. };
  155.  
  156. if (EndpointResolverFactoryFunc != null)
  157. {
  158. cf.EndpointResolverFactory = EndpointResolverFactoryFunc;
  159. }
  160.  
  161. return cf;
  162. }
  163.  
  164. private IConnectionFactory CreateConnectionFactoryWithDefaultOptions(
  165. string hostName,
  166. int port,
  167. string virtualHost,
  168. string sslServerName,
  169. string username,
  170. string password)
  171. {
  172. var cf = new ConnectionFactory
  173. {
  174. HostName = hostName,
  175. Port = port,
  176. VirtualHost = virtualHost,
  177. AutomaticRecoveryEnabled = true,
  178. NetworkRecoveryInterval = TimeSpan.FromSeconds(1),
  179. TopologyRecoveryEnabled = true,
  180. Ssl = new SslOption
  181. {
  182. ServerName = sslServerName,
  183. Enabled = true,
  184. AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
  185. SslPolicyErrors.RemoteCertificateChainErrors
  186. },
  187. RequestedConnectionTimeout = 3000,
  188. UserName = username,
  189. Password = password,
  190. RequestedHeartbeat = 60,
  191. };
  192.  
  193. if (EndpointResolverFactoryFunc != null)
  194. {
  195. cf.EndpointResolverFactory = EndpointResolverFactoryFunc;
  196. }
  197.  
  198. return cf;
  199. }
  200.  
  201. public IObservable<ReceivedMessageResult> ReceiveMessages(Func<string, Task<bool>> beforeAcknowledgeFunc)
  202. {
  203. Ensure.ArgumentNotNull(beforeAcknowledgeFunc, nameof(beforeAcknowledgeFunc));
  204.  
  205. return EnsureCreateConnectionReuseObservable()
  206. .SelectMany(connection => ReceiveMessagesInternal(connection, beforeAcknowledgeFunc))
  207. .RetryWithBackoff(
  208. RetryCount,
  209. n => RetryIntervalInMilliseconds,
  210. ex => true,
  211. WorkScheduler);
  212. }
  213.  
  214. private IObservable<ReceivedMessageResult> ReceiveMessagesInternal(IConnection connection, Func<string, Task<bool>> beforeAcknowledgeFunc)
  215. {
  216. return CreateReceiveMessagesObservable(connection, beforeAcknowledgeFunc);
  217. }
  218.  
  219. private IObservable<ReceivedMessageResult> CreateReceiveMessagesObservable(IConnection connection, Func<string, Task<bool>> beforeAcknowledgeFunc)
  220. {
  221. return Observable.Create<ReceivedMessageResult>(observer =>
  222. {
  223. var channel = CreateModel(connection);
  224. const bool durable = true;
  225. channel.QueueDeclare(QueueName, durable, false, false, null);
  226.  
  227. var consumer = new EventingBasicConsumer(channel);
  228.  
  229. var receivedEventObservable = Observable
  230. .FromEventPattern<BasicDeliverEventArgs>(
  231. h => consumer.Received += h,
  232. h => consumer.Received -= h)
  233. .Select(eventArgs => eventArgs.EventArgs)
  234. .SelectMany(ea => ProcessReceivedMessageAsync(ea, beforeAcknowledgeFunc))
  235. .Select(processResult =>
  236. {
  237. if (processResult.Item1)
  238. {
  239. channel.BasicAck(processResult.Item2.DeliveryTag, false);
  240. }
  241. else
  242. {
  243. channel.BasicNack(processResult.Item2.DeliveryTag, false, true);
  244. }
  245. return new ReceivedMessageResult(processResult.Item1, processResult.Item3);
  246. });
  247.  
  248. channel.BasicConsume(QueueName, false, consumer);
  249. Disposables.Add(channel);
  250. var disposableSubscribe = receivedEventObservable.Subscribe(result => observer.OnNext(result));
  251. return Disposable.Create(() => {
  252. disposableSubscribe.Dispose();
  253. channel.Dispose();
  254. });
  255. });
  256. }
  257.  
  258. private async Task<Tuple<bool, BasicDeliverEventArgs, string>> ProcessReceivedMessageAsync(
  259. BasicDeliverEventArgs eventArgs,
  260. Func<string, Task<bool>> beforeAcknowledgeFunc)
  261. {
  262. var body = eventArgs.Body;
  263. var message = MessageEncoding.GetString(body);
  264. var messageAcknowledged = await beforeAcknowledgeFunc(message).ConfigureAwait(false);
  265. return Tuple.Create(messageAcknowledged, eventArgs, message);
  266. }
  267.  
  268. public Task<bool> PublishMessageAsync<T>(
  269. T payload,
  270. CancellationToken cancellationToken = default(CancellationToken)) where T : class
  271. {
  272. Ensure.ArgumentNotNull(payload, nameof(payload));
  273.  
  274. Logger.Debug("PublishMessageAsync<T> STARTED!");
  275.  
  276. return CreateObservableToPublishMessage(payload)
  277. .ToTask(cancellationToken);
  278. }
  279.  
  280. private IObservable<bool> CreateObservableToPublishMessage<T>(T payload) where T : class
  281. {
  282. Logger.Debug("CreateObservableToPublishMessage<T> STARTED!");
  283.  
  284. return EnsureCreateConnectionReuseObservable()
  285. .SelectMany(connection => PublishMessageObservable(payload, connection))
  286. .RetryWithBackoff(
  287. RetryCount,
  288. n => RetryIntervalInMilliseconds,
  289. ex => true,
  290. WorkScheduler);
  291. }
  292.  
  293. private IObservable<bool> PublishMessageObservable<T>(T payload, IConnection connection) where T : class
  294. {
  295. return Observable.Create<bool>(observer =>
  296. {
  297. Logger.Debug("PublishMessageObservable<T> STARTED");
  298. try
  299. {
  300. Logger.Debug($"WILL CREATE IModel WITH CONNECTION: {connection?.Endpoint?.HostName} - {connection?.ClientProvidedName}");
  301. using (IModel channel = CreateModel(connection))
  302. {
  303. //channel.QueueBind(QueueName, Exchange, RoutingKey);
  304. Logger.Debug("BasicPublish");
  305. channel.BasicPublish(Exchange, QueueName, true, BasicProperties, payload.ToByteArray(MessageEncoding));
  306. Logger.Debug("WaitForConfirms");
  307. bool acknowledged = channel.WaitForConfirms(TimeoutExpirationInMilliseconds);
  308. Logger.Debug("IModel.Close()");
  309. channel.Close();
  310. Logger.Debug("IModel.Dispose()");
  311. channel.Dispose();
  312. if (!acknowledged)
  313. {
  314. Logger.Debug("DID NOT ACKNOWLEDGE MESSAGE");
  315. observer.OnError(new MessagingException("RabbitMQ message could not delivered"));
  316. }
  317. else
  318. {
  319. Logger.Debug("MESSAGE ACKNOWLEDGED");
  320. observer.OnNext(true);
  321. observer.OnCompleted();
  322. }
  323. }
  324. }
  325. catch (Exception ex)
  326. {
  327. Logger.Debug($"PublishMessageObservable<T> THROWN EXCEPTION: {ex}");
  328. observer.OnError(
  329. new MessagingException($"Unexpected RabbitMQ error: {ex.Message}", ex));
  330. }
  331.  
  332. return Disposable.Create(() => { });
  333. });
  334. }
  335.  
  336. private IObservable<IConnection> EnsureCreateConnectionReuseObservable()
  337. {
  338. return Observable.Create<IConnection>(observer =>
  339. {
  340. using (AsyncLockObject.Lock(LockKey))
  341. {
  342. Logger.Debug("EnsureCreateConnectionReuseObservable() STARTED");
  343. try
  344. {
  345. Logger.Debug("GET FIRST ACTIVE CONNECTION IF EXISTS");
  346. var firstActiveConnection = Connections.FirstOrDefault(p => p.Connection.IsOpen).Connection;
  347. if (firstActiveConnection.IsNotNull())
  348. {
  349. Logger.Debug("FIRST ACTIVE CONNECTION FOUND");
  350. if (Connections.Count > 1)
  351. {
  352. Logger.Debug("MORE THAN ONE CONNECTION - MAKE SURE ANY OTHER CONNECTION IS CLOSED, DISPOSED AND REMOVED FROM COLLECTION");
  353. Connections
  354. .Where(p => p.Connection != firstActiveConnection)
  355. .ForEachReverse(CloseDisposeAndRemoveConnection);
  356. }
  357.  
  358. observer.OnNext(firstActiveConnection);
  359. observer.OnCompleted();
  360. }
  361. else
  362. {
  363. Logger.Debug("NO ACTIVE/OPEN CONNECTION");
  364. if (Connections.Any())
  365. {
  366. Logger.Debug("FOUND CONNECTIONS - CLOSE AND DISPOSE ALL CONNECTIONS");
  367. Connections
  368. .ForEachReverse(CloseDisposeAndRemoveConnection);
  369. }
  370.  
  371. Logger.Debug("ASSIGN DEFAULT NULL CONNECTION VARIABLE");
  372. IConnection newConnection = default(IConnection);
  373.  
  374. Logger.Debug("WILL LOOP THE CONNECTION FACTORIES");
  375. foreach (IConnectionFactory connectionFactory in ConnectionFactories)
  376. {
  377. Logger.Debug($"CreateConnectionWithFactory(connectionFactory): WILL TRY - {((ConnectionFactory)connectionFactory).Endpoint?.HostName}");
  378. newConnection = CreateConnectionWithFactory(connectionFactory);
  379. Logger.Debug($"RETRIEVED NEW CONNECTION: {newConnection?.Endpoint?.HostName} - {newConnection?.ClientProvidedName}: NULL?: {newConnection.IsNull()}");
  380. if (newConnection.IsNotNull())
  381. {
  382. Logger.Debug($"NEW CONNECTION IS ESTABLISHED: {newConnection?.Endpoint?.HostName} - {newConnection?.ClientProvidedName}");
  383. break;
  384. }
  385. }
  386.  
  387. if (newConnection.IsNull())
  388. {
  389. Logger.Debug("NEW CONNECTION TRIED TO RETRIEVED BY ALL CONNECTION FACTORIES IS STILL NULL - THROW ERROR");
  390. observer.OnError(
  391. new MessagingException("Could not establish a connection with any of the RabbitMQ hosts"));
  392. }
  393. else
  394. {
  395. Logger.Debug("ADD NEW CONNECTION TO COLLECTION AND RETURN TO OBSERVER");
  396.  
  397. var connectionBlockedEventDisposable = Observable
  398. .FromEventPattern<ConnectionBlockedEventArgs>(
  399. h => newConnection.ConnectionBlocked += h,
  400. h => newConnection.ConnectionBlocked -= h)
  401. .Select(eventArgs => Tuple.Create(eventArgs.EventArgs, newConnection))
  402. .Subscribe(OnConnectionBlockedEventArgs);
  403.  
  404. var connectionShutdownEventDisposable = Observable
  405. .FromEventPattern<ShutdownEventArgs>(
  406. h => newConnection.ConnectionShutdown += h,
  407. h => newConnection.ConnectionShutdown -= h)
  408. .Select(eventArgs => Tuple.Create(eventArgs.EventArgs, newConnection))
  409. .Subscribe(OnConnectionShutdownEventArgs);
  410.  
  411. var connectionContext = new RabbitMqConnectionContext(
  412. newConnection,
  413. connectionBlockedEventDisposable,
  414. connectionShutdownEventDisposable);
  415.  
  416. Connections.Add(connectionContext);
  417. Disposables.Add(connectionContext);
  418.  
  419. observer.OnNext(newConnection);
  420. observer.OnCompleted();
  421. }
  422. }
  423. }
  424. catch (Exception ex)
  425. {
  426. Logger.Debug($"EXCEPTION WHILE TRYING TO EnsureCreateConnectionReuseObservable(): {ex}");
  427. observer.OnError(
  428. new MessagingException($"Unexpected error: {ex.Message}", ex));
  429. }
  430.  
  431. return Disposable.Create(() => { });
  432. }
  433. });
  434. }
  435.  
  436. private void OnConnectionBlockedEventArgs(Tuple<ConnectionBlockedEventArgs, IConnection> connectionBlockedTuple)
  437. {
  438. Logger.Debug($"Connection Blocked - {connectionBlockedTuple.Item1.Reason}");
  439. connectionBlockedTuple.Item2.Close();
  440. connectionBlockedTuple.Item2.Dispose();
  441. }
  442.  
  443. private void OnConnectionShutdownEventArgs(Tuple<ShutdownEventArgs, IConnection> connectionShutdownTuple)
  444. {
  445. Logger.Debug($"Connection Shutdown - {connectionShutdownTuple.Item1.ReplyText}");
  446. connectionShutdownTuple.Item2.Close();
  447. connectionShutdownTuple.Item2.Dispose();
  448. }
  449.  
  450. private void CloseDisposeAndRemoveConnection(RabbitMqConnectionContext connectionContext)
  451. {
  452. Logger.Debug($"WILL CLOSE AND DISPOSE CONNECTION: {connectionContext.Connection?.ClientProvidedName}");
  453. connectionContext.Dispose();
  454. var removed = Connections.TryTake(out RabbitMqConnectionContext conn);
  455. if (!removed)
  456. {
  457. Logger.Error("ERROR! Attention: RabbitMQ Connection could not be removed from BlockingCollection");
  458. }
  459. }
  460.  
  461. private static IConnection CreateConnectionWithFactory(IConnectionFactory connectionFactory)
  462. {
  463. var connection = default(IConnection);
  464. try
  465. {
  466. connection = connectionFactory.CreateConnection();
  467. }
  468. catch
  469. { }
  470.  
  471. return connection;
  472. }
  473.  
  474. private static IModel CreateModel(IConnection connection)
  475. {
  476. var channel = connection.CreateModel();
  477. channel.ConfirmSelect();
  478. return channel;
  479. }
  480.  
  481. public void Dispose()
  482. {
  483. // If this function is being called the user wants to release the
  484. // resources. lets call the Dispose which will do this for us.
  485. Dispose(true);
  486.  
  487. // Now since we have done the cleanup already there is nothing left
  488. // for the Finalizer to do. So lets tell the GC not to call it later.
  489. GC.SuppressFinalize(this);
  490. }
  491.  
  492. protected virtual void Dispose(bool disposing)
  493. {
  494. if (disposing)
  495. {
  496. //someone want the deterministic release of all resources
  497. //Let us release all the managed resources
  498. Disposables.Dispose();
  499. }
  500. else
  501. {
  502. // Do nothing, no one asked a dispose, the object went out of
  503. // scope and finalized is called so lets next round of GC
  504. // release these resources
  505. }
  506.  
  507. // Release the unmanaged resource in any case as they will not be
  508. // released by GC
  509. }
  510.  
  511. ~RabbitMqMessagingServiceObservable()
  512. {
  513. // The object went out of scope and finalized is called
  514. // Lets call dispose in to release unmanaged resources
  515. // the managed resources will anyways be released when GC
  516. // runs the next time.
  517. Dispose(false);
  518. }
  519. }
  520. }
Add Comment
Please, Sign In to add comment