Guest User

Patch

a guest
Jul 14th, 2020
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.14 KB | None | 0 0
  1. Index: src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java
  2. IDEA additional info:
  3. Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
  4. <+>UTF-8
  5. ===================================================================
  6. --- src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java (revision 24686b1a8d61b1bf95db017bdf9c30041db1c523)
  7. +++ src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java (date 1594737172823)
  8. @@ -1,18 +1,27 @@
  9.  package com.chlegou.demos.rqueue.rqueue;
  10.  
  11. +import com.chlegou.demos.rqueue.domain.messages.SimpleMessage;
  12.  import com.github.sonus21.rqueue.annotation.RqueueListener;
  13. +import java.util.List;
  14. +import java.util.concurrent.CopyOnWriteArrayList;
  15.  import lombok.extern.slf4j.Slf4j;
  16. -import org.springframework.scheduling.annotation.Async;
  17.  import org.springframework.stereotype.Component;
  18.  
  19.  @Component
  20.  @Slf4j
  21.  public class TestListener {
  22.  
  23. +    public List<Integer> ids = new CopyOnWriteArrayList<>();
  24. +    public List<Integer> simpleIds = new CopyOnWriteArrayList<>();
  25.  
  26.      @RqueueListener(value = "${rqueue.test.echo}")
  27. -    public void echoQueue(String message) {
  28. +    public void echoQueue(SimpleMessage message) {
  29.          log.info("echo delayed message: {}", message);
  30. +        if (message.isDelayed()) {
  31. +            ids.add(message.getId());
  32. +        } else {
  33. +            simpleIds.add(message.getId());
  34. +        }
  35.      }
  36.  
  37.  
  38. Index: src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java
  39. IDEA additional info:
  40. Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
  41. <+>UTF-8
  42. ===================================================================
  43. --- src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java   (revision 24686b1a8d61b1bf95db017bdf9c30041db1c523)
  44. +++ src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java   (date 1594737172811)
  45. @@ -1,8 +1,16 @@
  46.  package com.chlegou.demos.rqueue.web.rest;
  47.  
  48.  
  49. +import com.chlegou.demos.rqueue.domain.messages.SimpleMessage;
  50. +import com.chlegou.demos.rqueue.rqueue.TestListener;
  51.  import com.github.sonus21.rqueue.core.RqueueMessageSender;
  52. +import java.util.Collections;
  53. +import java.util.HashMap;
  54. +import java.util.Map;
  55. +import java.util.concurrent.TimeUnit;
  56. +import java.util.concurrent.atomic.AtomicInteger;
  57.  import lombok.extern.slf4j.Slf4j;
  58. +import org.springframework.beans.factory.annotation.Autowired;
  59.  import org.springframework.beans.factory.annotation.Value;
  60.  import org.springframework.http.HttpStatus;
  61.  import org.springframework.http.ResponseEntity;
  62. @@ -11,42 +19,54 @@
  63.  import org.springframework.web.bind.annotation.RequestMapping;
  64.  import org.springframework.web.bind.annotation.RestController;
  65.  
  66. -import java.util.HashMap;
  67. -import java.util.Map;
  68. -import java.util.concurrent.TimeUnit;
  69. -
  70.  @RestController
  71.  @RequestMapping("/api/test")
  72.  @Slf4j
  73.  public class TestResource {
  74.  
  75.      private RqueueMessageSender rqueueMessageSender;
  76. +    private TestListener testListener;
  77. +
  78.  
  79.      @Value("${rqueue.test.echo}")
  80.      private String echoQueue;
  81.      @Value("${rqueue.test.echo-timespan}")
  82.      private long echoTimespan;
  83. +    private final AtomicInteger counter = new AtomicInteger();
  84.  
  85.  
  86. -    public TestResource(RqueueMessageSender rqueueMessageSender) {
  87. +    @Autowired
  88. +    public TestResource(RqueueMessageSender rqueueMessageSender, TestListener testListener) {
  89.          this.rqueueMessageSender = rqueueMessageSender;
  90. +        this.testListener = testListener;
  91.      }
  92.  
  93.  
  94.      @GetMapping("/echo/{message}")
  95. -    public ResponseEntity<Map<String, String>> postMessage(@PathVariable String message) {
  96. +    public ResponseEntity<Map<String, Object>> postMessage(@PathVariable String message) {
  97.          log.info("echo instant message: {}", message);
  98. -        Map<String, String> result = new HashMap<>();
  99. -        result.put("message", message);
  100. -
  101. -        return ResponseEntity.status(HttpStatus.OK).body(result);
  102. +        SimpleMessage simpleMessage = new SimpleMessage(counter.incrementAndGet(), message, false);
  103. +        rqueueMessageSender.enqueue(echoQueue, simpleMessage);
  104. +        return ResponseEntity.status(HttpStatus.OK)
  105. +            .body(Collections.singletonMap("message", simpleMessage));
  106.      }
  107.  
  108.  
  109.      @GetMapping("/echo/delay/{message}")
  110. -    public ResponseEntity<Void> informNextRoll(@PathVariable String message) {
  111. +    public ResponseEntity<Map<String, Object>> informNextRoll(@PathVariable String message) {
  112.          log.info("queuing delayed message: {}", message);
  113. -        rqueueMessageSender.enqueueIn(echoQueue, message, echoTimespan, TimeUnit.SECONDS);
  114. -        return ResponseEntity.status(HttpStatus.OK).build();
  115. +        SimpleMessage simpleMessage = new SimpleMessage(counter.incrementAndGet(), message, true);
  116. +        rqueueMessageSender.enqueueIn(echoQueue, simpleMessage, echoTimespan, TimeUnit.SECONDS);
  117. +        return ResponseEntity.status(HttpStatus.OK)
  118. +            .body(Collections.singletonMap("message", simpleMessage));
  119. +    }
  120. +
  121. +    @GetMapping("/consumed-messages")
  122. +    public ResponseEntity<Map<String, Object>> getConsumedMessage() {
  123. +        Map<String, Object> result = new HashMap<>();
  124. +        result.put("delayedIds", testListener.ids);
  125. +        result.put("simpleIds", testListener.simpleIds);
  126. +        return ResponseEntity.status(HttpStatus.OK)
  127. +            .body(result);
  128.      }
  129.  }
  130. Index: src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java
  131. IDEA additional info:
  132. Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
  133. <+>UTF-8
  134. ===================================================================
  135. --- src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java   (date 1594737344034)
  136. +++ src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java   (date 1594737344034)
  137. @@ -0,0 +1,19 @@
  138. +package com.chlegou.demos.rqueue.domain.messages;
  139. +
  140. +import lombok.AllArgsConstructor;
  141. +import lombok.Getter;
  142. +import lombok.NoArgsConstructor;
  143. +import lombok.Setter;
  144. +import lombok.ToString;
  145. +
  146. +@AllArgsConstructor
  147. +@NoArgsConstructor
  148. +@Getter
  149. +@Setter
  150. +@ToString
  151. +public class SimpleMessage {
  152. +
  153. +    private Integer id;
  154. +    private String message;
  155. +    private boolean delayed;
  156. +}
Add Comment
Please, Sign In to add comment