Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class EnqueueCommand : IDisposable
- {
- private readonly IConnection _conn;
- private readonly IModel _channel;
- private readonly string _defaultQueue;
- public EnqueueCommand(IConfiguration config)
- {
- var factory = new ConnectionFactory
- {
- UserName = config["RabbitConnectionOptions:UserName"],
- Password = config["RabbitConnectionOptions:Passwd"],
- VirtualHost = config["RabbitConnectionOptions:VirtualHost"],
- HostName = config["RabbitConnectionOptions:Host"]
- };
- _conn = factory.CreateConnection();
- _channel = _conn.CreateModel();
- _defaultQueue = config["RabbitConnectionOptions:QueueName"];
- }
- public void Execute<T>(T message, string queueName = null)
- {
- var messageBytes = Serialize(message);
- _channel.QueueDeclare(queue: queueName ?? _defaultQueue,
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- _channel.BasicPublish(exchange: string.Empty,
- routingKey: queueName ?? _defaultQueue,
- basicProperties: null,
- body: messageBytes);
- }
- public void ExecuteToExchange<T>(T message, string exchange, string exchangeType)
- {
- var messageBytes = Serialize(message);
- _channel.ExchangeDeclare(exchange: exchange, type: exchangeType);
- _channel.BasicPublish(exchange: exchange,
- routingKey: string.Empty,
- basicProperties: null,
- body: messageBytes);
- }
- private static byte[] Serialize<T>(T message)
- {
- byte[] messageBytes;
- if (typeof(T) == typeof(byte[]))
- {
- messageBytes = message as byte[];
- }
- else
- {
- messageBytes = LZ4MessagePackSerializer.Serialize(message,
- MessagePack.Resolvers.ContractlessStandardResolver.Instance);
- }
- return messageBytes;
- }
- public void Dispose()
- {
- _channel?.Dispose();
- _conn?.Dispose();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement