Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Net.Security;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Linq;
- using System.Reactive.Threading.Tasks;
- using System.Text;
- using System.Threading.Tasks;
- using Extend;
- using RabbitMQ.Client;
- using VRSK.UW.Common.Messaging.Extensions;
- using Genesis.Ensure;
- using RabbitMQ.Client.Events;
- using RabbitMQ.Client.Framing;
- using VRSK.UW.Common.Messaging.Models;
- using System.Collections.Concurrent;
- using System.Linq;
- using System.Threading;
- using log4net;
- using System.Reflection;
- using VRSK.UW.Infrastructure.Models;
- using VRSK.UW.Infrastructure.Extensions;
- namespace VRSK.UW.Common.Messaging.Services.RabbitMQ
- {
- public class RabbitMqMessagingServiceObservable : IMessagingService
- {
- protected static BlockingCollection<RabbitMqConnectionContext> Connections = new BlockingCollection<RabbitMqConnectionContext>();
- private readonly ILog Logger;
- private readonly AsyncDuplicateLock AsyncLockObject = new AsyncDuplicateLock();
- private readonly string LockKey = "RabbitMqMessagingService";
- protected readonly string Exchange;
- protected readonly string QueueName;
- protected readonly string RoutingKey;
- protected readonly int RetryCount;
- protected readonly IBasicProperties BasicProperties;
- protected readonly TimeSpan TimeoutExpirationInMilliseconds;
- protected readonly TimeSpan RetryIntervalInMilliseconds;
- protected readonly CompositeDisposable Disposables = new CompositeDisposable();
- protected readonly Encoding MessageEncoding;
- protected readonly IScheduler WorkScheduler;
- protected readonly Func<IEnumerable<AmqpTcpEndpoint>, IEndpointResolver> EndpointResolverFactoryFunc;
- protected virtual IReadOnlyCollection<IConnectionFactory> ConnectionFactories { get; private set; }
- public RabbitMqMessagingServiceObservable(
- RabbitMqOptions options,
- ILog logger = null)
- {
- Ensure.ArgumentNotNull(options, nameof(options));
- Logger = Logger ?? LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
- if (options.Ssl.IsNull())
- {
- options.Ssl = new SslOption
- {
- ServerName = options.SslServerName,
- Enabled = true,
- AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
- SslPolicyErrors.RemoteCertificateChainErrors
- };
- }
- if (options.BasicProperties.IsNull())
- {
- options.BasicProperties = new BasicProperties
- {
- Persistent = true,
- DeliveryMode = 2
- };
- }
- Exchange = options.Exchange;
- RetryCount = options.RetryCount;
- RetryIntervalInMilliseconds = options.RetryIntervalInMilliseconds;
- QueueName = options.QueueName;
- RoutingKey = options.RoutingKey ?? options.QueueName;
- BasicProperties = options.BasicProperties;
- TimeoutExpirationInMilliseconds = options.TimeoutExpirationInMilliseconds;
- MessageEncoding = options.MessagEncoding;
- WorkScheduler = options.WorkScheduler;
- ConnectionFactories = options.HostNames
- .Select(hostName => CreateConnectionFactoryWithOptions(hostName, options)).ToReadOnlyCollection();
- }
- public RabbitMqMessagingServiceObservable(
- string[] hostNames,
- int port,
- string virtualHost,
- string sslServerName,
- string username,
- string password,
- string queueName,
- string routingKey = null,
- string exchange = "",
- IBasicProperties basicProperties = null,
- TimeSpan? timeoutExpirationInMilliseconds = null,
- int retryCount = 5,
- TimeSpan? retryIntervalInMilliseconds = null,
- Encoding encoding = null,
- IScheduler workScheduler = null,
- Func<IEnumerable<AmqpTcpEndpoint>, IEndpointResolver> endpointResolverFactoryFunc = null,
- ILog logger = null)
- {
- Ensure.ArgumentNotNull(hostNames, nameof(hostNames));
- Ensure.ArgumentNotNull(virtualHost, nameof(virtualHost));
- Ensure.ArgumentNotNull(sslServerName, nameof(sslServerName));
- Ensure.ArgumentNotNull(username, nameof(username));
- Ensure.ArgumentNotNull(password, nameof(password));
- Ensure.ArgumentNotNull(queueName, nameof(queueName));
- Logger = Logger ?? LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
- Exchange = exchange;
- RetryCount = retryCount;
- RetryIntervalInMilliseconds = retryIntervalInMilliseconds ?? TimeSpan.FromMilliseconds(200);
- TimeoutExpirationInMilliseconds = timeoutExpirationInMilliseconds ?? TimeSpan.FromMilliseconds(300);
- QueueName = queueName;
- RoutingKey = routingKey ?? queueName;
- MessageEncoding = encoding ?? Encoding.UTF8;
- WorkScheduler = workScheduler ?? DefaultScheduler.Instance;
- EndpointResolverFactoryFunc = endpointResolverFactoryFunc;
- BasicProperties = basicProperties ?? new BasicProperties
- {
- Persistent = true,
- DeliveryMode = 2
- };
- ConnectionFactories = hostNames
- .Select(hostName => CreateConnectionFactoryWithDefaultOptions(hostName, port, virtualHost, sslServerName, username, password))
- .ToReadOnlyCollection();
- }
- private IConnectionFactory CreateConnectionFactoryWithOptions(
- string hostName,
- RabbitMqOptions options)
- {
- var cf = new ConnectionFactory
- {
- HostName = hostName,
- Port = options.Port,
- VirtualHost = options.VirtualHost,
- AutomaticRecoveryEnabled = options.AutomaticRecoveryEnabled,
- NetworkRecoveryInterval = options.NetworkRecoveryInterval,
- TopologyRecoveryEnabled = options.TopologyRecoveryEnabled,
- Ssl = options.Ssl,
- RequestedConnectionTimeout = 3000,
- UserName = options.Username,
- Password = options.Password,
- RequestedHeartbeat = 60,
- };
- if (EndpointResolverFactoryFunc != null)
- {
- cf.EndpointResolverFactory = EndpointResolverFactoryFunc;
- }
- return cf;
- }
- private IConnectionFactory CreateConnectionFactoryWithDefaultOptions(
- string hostName,
- int port,
- string virtualHost,
- string sslServerName,
- string username,
- string password)
- {
- var cf = new ConnectionFactory
- {
- HostName = hostName,
- Port = port,
- VirtualHost = virtualHost,
- AutomaticRecoveryEnabled = true,
- NetworkRecoveryInterval = TimeSpan.FromSeconds(1),
- TopologyRecoveryEnabled = true,
- Ssl = new SslOption
- {
- ServerName = sslServerName,
- Enabled = true,
- AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
- SslPolicyErrors.RemoteCertificateChainErrors
- },
- RequestedConnectionTimeout = 3000,
- UserName = username,
- Password = password,
- RequestedHeartbeat = 60,
- };
- if (EndpointResolverFactoryFunc != null)
- {
- cf.EndpointResolverFactory = EndpointResolverFactoryFunc;
- }
- return cf;
- }
- public IObservable<ReceivedMessageResult> ReceiveMessages(Func<string, Task<bool>> beforeAcknowledgeFunc)
- {
- Ensure.ArgumentNotNull(beforeAcknowledgeFunc, nameof(beforeAcknowledgeFunc));
- return EnsureCreateConnectionReuseObservable()
- .SelectMany(connection => ReceiveMessagesInternal(connection, beforeAcknowledgeFunc))
- .RetryWithBackoff(
- RetryCount,
- n => RetryIntervalInMilliseconds,
- ex => true,
- WorkScheduler);
- }
- private IObservable<ReceivedMessageResult> ReceiveMessagesInternal(IConnection connection, Func<string, Task<bool>> beforeAcknowledgeFunc)
- {
- return CreateReceiveMessagesObservable(connection, beforeAcknowledgeFunc);
- }
- private IObservable<ReceivedMessageResult> CreateReceiveMessagesObservable(IConnection connection, Func<string, Task<bool>> beforeAcknowledgeFunc)
- {
- return Observable.Create<ReceivedMessageResult>(observer =>
- {
- var channel = CreateModel(connection);
- const bool durable = true;
- channel.QueueDeclare(QueueName, durable, false, false, null);
- var consumer = new EventingBasicConsumer(channel);
- var receivedEventObservable = Observable
- .FromEventPattern<BasicDeliverEventArgs>(
- h => consumer.Received += h,
- h => consumer.Received -= h)
- .Select(eventArgs => eventArgs.EventArgs)
- .SelectMany(ea => ProcessReceivedMessageAsync(ea, beforeAcknowledgeFunc))
- .Select(processResult =>
- {
- if (processResult.Item1)
- {
- channel.BasicAck(processResult.Item2.DeliveryTag, false);
- }
- else
- {
- channel.BasicNack(processResult.Item2.DeliveryTag, false, true);
- }
- return new ReceivedMessageResult(processResult.Item1, processResult.Item3);
- });
- channel.BasicConsume(QueueName, false, consumer);
- Disposables.Add(channel);
- var disposableSubscribe = receivedEventObservable.Subscribe(result => observer.OnNext(result));
- return Disposable.Create(() => {
- disposableSubscribe.Dispose();
- channel.Dispose();
- });
- });
- }
- private async Task<Tuple<bool, BasicDeliverEventArgs, string>> ProcessReceivedMessageAsync(
- BasicDeliverEventArgs eventArgs,
- Func<string, Task<bool>> beforeAcknowledgeFunc)
- {
- var body = eventArgs.Body;
- var message = MessageEncoding.GetString(body);
- var messageAcknowledged = await beforeAcknowledgeFunc(message).ConfigureAwait(false);
- return Tuple.Create(messageAcknowledged, eventArgs, message);
- }
- public Task<bool> PublishMessageAsync<T>(
- T payload,
- CancellationToken cancellationToken = default(CancellationToken)) where T : class
- {
- Ensure.ArgumentNotNull(payload, nameof(payload));
- Logger.Debug("PublishMessageAsync<T> STARTED!");
- return CreateObservableToPublishMessage(payload)
- .ToTask(cancellationToken);
- }
- private IObservable<bool> CreateObservableToPublishMessage<T>(T payload) where T : class
- {
- Logger.Debug("CreateObservableToPublishMessage<T> STARTED!");
- return EnsureCreateConnectionReuseObservable()
- .SelectMany(connection => PublishMessageObservable(payload, connection))
- .RetryWithBackoff(
- RetryCount,
- n => RetryIntervalInMilliseconds,
- ex => true,
- WorkScheduler);
- }
- private IObservable<bool> PublishMessageObservable<T>(T payload, IConnection connection) where T : class
- {
- return Observable.Create<bool>(observer =>
- {
- Logger.Debug("PublishMessageObservable<T> STARTED");
- try
- {
- Logger.Debug($"WILL CREATE IModel WITH CONNECTION: {connection?.Endpoint?.HostName} - {connection?.ClientProvidedName}");
- using (IModel channel = CreateModel(connection))
- {
- //channel.QueueBind(QueueName, Exchange, RoutingKey);
- Logger.Debug("BasicPublish");
- channel.BasicPublish(Exchange, QueueName, true, BasicProperties, payload.ToByteArray(MessageEncoding));
- Logger.Debug("WaitForConfirms");
- bool acknowledged = channel.WaitForConfirms(TimeoutExpirationInMilliseconds);
- Logger.Debug("IModel.Close()");
- channel.Close();
- Logger.Debug("IModel.Dispose()");
- channel.Dispose();
- if (!acknowledged)
- {
- Logger.Debug("DID NOT ACKNOWLEDGE MESSAGE");
- observer.OnError(new MessagingException("RabbitMQ message could not delivered"));
- }
- else
- {
- Logger.Debug("MESSAGE ACKNOWLEDGED");
- observer.OnNext(true);
- observer.OnCompleted();
- }
- }
- }
- catch (Exception ex)
- {
- Logger.Debug($"PublishMessageObservable<T> THROWN EXCEPTION: {ex}");
- observer.OnError(
- new MessagingException($"Unexpected RabbitMQ error: {ex.Message}", ex));
- }
- return Disposable.Create(() => { });
- });
- }
- private IObservable<IConnection> EnsureCreateConnectionReuseObservable()
- {
- return Observable.Create<IConnection>(observer =>
- {
- using (AsyncLockObject.Lock(LockKey))
- {
- Logger.Debug("EnsureCreateConnectionReuseObservable() STARTED");
- try
- {
- Logger.Debug("GET FIRST ACTIVE CONNECTION IF EXISTS");
- var firstActiveConnection = Connections.FirstOrDefault(p => p.Connection.IsOpen).Connection;
- if (firstActiveConnection.IsNotNull())
- {
- Logger.Debug("FIRST ACTIVE CONNECTION FOUND");
- if (Connections.Count > 1)
- {
- Logger.Debug("MORE THAN ONE CONNECTION - MAKE SURE ANY OTHER CONNECTION IS CLOSED, DISPOSED AND REMOVED FROM COLLECTION");
- Connections
- .Where(p => p.Connection != firstActiveConnection)
- .ForEachReverse(CloseDisposeAndRemoveConnection);
- }
- observer.OnNext(firstActiveConnection);
- observer.OnCompleted();
- }
- else
- {
- Logger.Debug("NO ACTIVE/OPEN CONNECTION");
- if (Connections.Any())
- {
- Logger.Debug("FOUND CONNECTIONS - CLOSE AND DISPOSE ALL CONNECTIONS");
- Connections
- .ForEachReverse(CloseDisposeAndRemoveConnection);
- }
- Logger.Debug("ASSIGN DEFAULT NULL CONNECTION VARIABLE");
- IConnection newConnection = default(IConnection);
- Logger.Debug("WILL LOOP THE CONNECTION FACTORIES");
- foreach (IConnectionFactory connectionFactory in ConnectionFactories)
- {
- Logger.Debug($"CreateConnectionWithFactory(connectionFactory): WILL TRY - {((ConnectionFactory)connectionFactory).Endpoint?.HostName}");
- newConnection = CreateConnectionWithFactory(connectionFactory);
- Logger.Debug($"RETRIEVED NEW CONNECTION: {newConnection?.Endpoint?.HostName} - {newConnection?.ClientProvidedName}: NULL?: {newConnection.IsNull()}");
- if (newConnection.IsNotNull())
- {
- Logger.Debug($"NEW CONNECTION IS ESTABLISHED: {newConnection?.Endpoint?.HostName} - {newConnection?.ClientProvidedName}");
- break;
- }
- }
- if (newConnection.IsNull())
- {
- Logger.Debug("NEW CONNECTION TRIED TO RETRIEVED BY ALL CONNECTION FACTORIES IS STILL NULL - THROW ERROR");
- observer.OnError(
- new MessagingException("Could not establish a connection with any of the RabbitMQ hosts"));
- }
- else
- {
- Logger.Debug("ADD NEW CONNECTION TO COLLECTION AND RETURN TO OBSERVER");
- var connectionBlockedEventDisposable = Observable
- .FromEventPattern<ConnectionBlockedEventArgs>(
- h => newConnection.ConnectionBlocked += h,
- h => newConnection.ConnectionBlocked -= h)
- .Select(eventArgs => Tuple.Create(eventArgs.EventArgs, newConnection))
- .Subscribe(OnConnectionBlockedEventArgs);
- var connectionShutdownEventDisposable = Observable
- .FromEventPattern<ShutdownEventArgs>(
- h => newConnection.ConnectionShutdown += h,
- h => newConnection.ConnectionShutdown -= h)
- .Select(eventArgs => Tuple.Create(eventArgs.EventArgs, newConnection))
- .Subscribe(OnConnectionShutdownEventArgs);
- var connectionContext = new RabbitMqConnectionContext(
- newConnection,
- connectionBlockedEventDisposable,
- connectionShutdownEventDisposable);
- Connections.Add(connectionContext);
- Disposables.Add(connectionContext);
- observer.OnNext(newConnection);
- observer.OnCompleted();
- }
- }
- }
- catch (Exception ex)
- {
- Logger.Debug($"EXCEPTION WHILE TRYING TO EnsureCreateConnectionReuseObservable(): {ex}");
- observer.OnError(
- new MessagingException($"Unexpected error: {ex.Message}", ex));
- }
- return Disposable.Create(() => { });
- }
- });
- }
- private void OnConnectionBlockedEventArgs(Tuple<ConnectionBlockedEventArgs, IConnection> connectionBlockedTuple)
- {
- Logger.Debug($"Connection Blocked - {connectionBlockedTuple.Item1.Reason}");
- connectionBlockedTuple.Item2.Close();
- connectionBlockedTuple.Item2.Dispose();
- }
- private void OnConnectionShutdownEventArgs(Tuple<ShutdownEventArgs, IConnection> connectionShutdownTuple)
- {
- Logger.Debug($"Connection Shutdown - {connectionShutdownTuple.Item1.ReplyText}");
- connectionShutdownTuple.Item2.Close();
- connectionShutdownTuple.Item2.Dispose();
- }
- private void CloseDisposeAndRemoveConnection(RabbitMqConnectionContext connectionContext)
- {
- Logger.Debug($"WILL CLOSE AND DISPOSE CONNECTION: {connectionContext.Connection?.ClientProvidedName}");
- connectionContext.Dispose();
- var removed = Connections.TryTake(out RabbitMqConnectionContext conn);
- if (!removed)
- {
- Logger.Error("ERROR! Attention: RabbitMQ Connection could not be removed from BlockingCollection");
- }
- }
- private static IConnection CreateConnectionWithFactory(IConnectionFactory connectionFactory)
- {
- var connection = default(IConnection);
- try
- {
- connection = connectionFactory.CreateConnection();
- }
- catch
- { }
- return connection;
- }
- private static IModel CreateModel(IConnection connection)
- {
- var channel = connection.CreateModel();
- channel.ConfirmSelect();
- return channel;
- }
- public void Dispose()
- {
- // If this function is being called the user wants to release the
- // resources. lets call the Dispose which will do this for us.
- Dispose(true);
- // Now since we have done the cleanup already there is nothing left
- // for the Finalizer to do. So lets tell the GC not to call it later.
- GC.SuppressFinalize(this);
- }
- protected virtual void Dispose(bool disposing)
- {
- if (disposing)
- {
- //someone want the deterministic release of all resources
- //Let us release all the managed resources
- Disposables.Dispose();
- }
- else
- {
- // Do nothing, no one asked a dispose, the object went out of
- // scope and finalized is called so lets next round of GC
- // release these resources
- }
- // Release the unmanaged resource in any case as they will not be
- // released by GC
- }
- ~RabbitMqMessagingServiceObservable()
- {
- // The object went out of scope and finalized is called
- // Lets call dispose in to release unmanaged resources
- // the managed resources will anyways be released when GC
- // runs the next time.
- Dispose(false);
- }
- }
- }
Add Comment
Please, Sign In to add comment