Advertisement
blaff23

Untitled

Dec 13th, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.23 KB | None | 0 0
  1. package com.exp.components.retry.impl;
  2.  
  3. import com.exp.components.entity.Message;
  4. import com.exp.components.retry.Retrier;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7.  
  8. import java.util.List;
  9. import java.util.UUID;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. import java.util.function.Function;
  12. import java.util.stream.Collectors;
  13.  
  14. public class SenderRetrier implements Retrier {
  15.     private static final Logger LOG = LoggerFactory.getLogger(SenderRetrier.class);
  16.  
  17.     private ConcurrentHashMap<UUID, Message> pendingMessages;
  18.     private ConcurrentHashMap<UUID, Integer> retryMap;
  19.     private int maxRetries;
  20.  
  21.     public SenderRetrier(int maxRetries) {
  22.         this.maxRetries = maxRetries;
  23.         this.pendingMessages = new ConcurrentHashMap<>();
  24.         this.retryMap = new ConcurrentHashMap<>();
  25.     }
  26.  
  27.     @Override
  28.     public Message retry(UUID msgId) {
  29.         Message message = pendingMessages.get(msgId);
  30.         if (message == null) {
  31.             return null;
  32.         }
  33.  
  34.         Integer retryNum = retryMap.get(msgId);
  35.         if (retryNum >= maxRetries) {
  36.             LOG.info("Reached max retries: {}, assume message: {} is poison", retryNum, msgId);
  37.             deletePendingMessage(msgId);
  38.             return null;
  39.         }
  40.         LOG.info("Try resend message: {}, current retries: {}", msgId, retryNum);
  41.         retryMap.replace(msgId, ++retryNum);
  42.         return message;
  43.     }
  44.  
  45.     @Override
  46.     public void deletePendingMessage(UUID msgId) {
  47.         pendingMessages.remove(msgId);
  48.         retryMap.remove(msgId);
  49.     }
  50.  
  51.     @Override
  52.     public void deletePendingMessages(List<UUID> uuidList) {
  53.         uuidList.forEach(id -> {
  54.             pendingMessages.remove(id);
  55.             retryMap.remove(id);
  56.         });
  57.     }
  58.  
  59.     @Override
  60.     public void addPendingMessages(List<Message> messages) {
  61.         pendingMessages.putAll(messages.stream().collect(Collectors.toMap(Message::getId, Function.identity())));
  62.         retryMap.putAll(messages.stream().collect(Collectors.toMap(Message::getId, m -> 1)));
  63.     }
  64.  
  65.     @Override
  66.     public void addPendingMessage(Message message) {
  67.         pendingMessages.put(message.getId(), message);
  68.         retryMap.put(message.getId(), 1);
  69.     }
  70. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement