Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //Create the connection factory
- var connectionFactory = new ConnectionFactory() {
- HostName = host,
- UserName = userName,
- Password = password
- };
- connectionFactory.RequestedHeartbeat = 10;
- connectionFactory.RequestedConnectionTimeout = 30000;
- connectionFactory.AutomaticRecoveryEnabled = true;
- connectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
- //connection
- var connection = connectionFactory.CreateConnection();
- logger.Info($"Connected to RabbitMQ {host}");
- connection.ConnectionShutdown += Connection_ConnectionShutdown;
- var model = connection.CreateModel();
- model.BasicQos(0, 1, false);
- var consumer = new QueueingBasicConsumer(model);
- model.BasicConsume(queueName, false, consumer);
- while (true) {
- try {
- var deliveryArgs = consumer.Queue.Dequeue();
- model.BasicAck(deliveryArgs.DeliveryTag, false);
- var jsonString = Encoding.Default.GetString(deliveryArgs.Body);
- var itemtoprocess= jsonString.FromJson<recieved message>();
- if (deliveryArgs.Redelivered) {
- model.BasicReject(deliveryArgs.DeliveryTag, false);
- }
- else {
- var task = Task.Factory.StartNew(() => {
- //Do work here on different thread then this one
- //Call the churner to process the message
- //Some long running method here to process item recieve
- });
- Task.WaitAll(task);
- }
- }
- catch (EndOfStreamException ex) {
- //log
- }
- }
- channel.ExchangeDeclare (exchangeName, ExchangeType.Topic, true, false, null);
- //Queue
- var queue = channel.QueueDeclare (queueName, true, false, false, null );
- channel.QueueBind (queue.QueueName, exchangeName, routingKey);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement