Advertisement
Guest User

Untitled

a guest
Aug 25th, 2019
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.00 KB | None | 0 0
  1. using Microsoft.Extensions.Options;
  2. using Newtonsoft.Json;
  3. using ProductService.Models;
  4. using ProductService.Queues.Interfaces;
  5. using ProductService.Repository;
  6. using RabbitMQ.Client;
  7. using RabbitMQ.Client.Events;
  8. using System.Text;
  9.  
  10. namespace ProductService.Queues.AMQP
  11. {
  12.     public class AMQPReleasedProductsDataEventSubscriber : IReleasedProductsDataEventSubscriber
  13.     {
  14.         private EventingBasicConsumer consumer;
  15.         private QueueOptions queueOptions;
  16.         private string consumerTag;
  17.         private IModel channel;
  18.         private IProductRepository productRepository;
  19.  
  20.         public AMQPReleasedProductsDataEventSubscriber(IOptions<QueueOptions> queueOptions,
  21.             EventingBasicConsumer consumer, IProductRepository productRepository)
  22.         {
  23.             this.queueOptions = queueOptions.Value;
  24.             this.consumer = consumer;
  25.  
  26.             this.channel = consumer.Model;
  27.             this.productRepository = productRepository;
  28.  
  29.             Initialize();
  30.         }
  31.  
  32.         private void Initialize()
  33.         {
  34.             channel.QueueDeclare(
  35.                 queue: queueOptions.ReleasedProductsDataEventQueueName,
  36.                 durable: false,
  37.                 exclusive: false,
  38.                 autoDelete: false,
  39.                 arguments: null
  40.             );
  41.  
  42.             consumer.Received += (ch, ea) =>
  43.             {
  44.                 var body = ea.Body;
  45.                 string msg = Encoding.UTF8.GetString(body);
  46.                 ProductDetails productDetails = JsonConvert.DeserializeObject<ProductDetails>(msg);
  47.           //      productRepository.UpdateProductsAmount(productDetails);
  48.  
  49.                 channel.BasicAck(ea.DeliveryTag, false);
  50.             };
  51.         }
  52.  
  53.         public void Subscribe()
  54.         {
  55.             consumerTag = channel.BasicConsume(queueOptions.ReleasedProductsDataEventQueueName, false, consumer);
  56.         }
  57.  
  58.         public void Unsubscribe()
  59.         {
  60.             channel.BasicCancel(consumerTag);
  61.         }
  62.     }
  63. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement