Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Index: src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java
- IDEA additional info:
- Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
- <+>UTF-8
- ===================================================================
- --- src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java (revision 24686b1a8d61b1bf95db017bdf9c30041db1c523)
- +++ src/main/java/com/chlegou/demos/rqueue/rqueue/TestListener.java (date 1594737172823)
- @@ -1,18 +1,27 @@
- package com.chlegou.demos.rqueue.rqueue;
- +import com.chlegou.demos.rqueue.domain.messages.SimpleMessage;
- import com.github.sonus21.rqueue.annotation.RqueueListener;
- +import java.util.List;
- +import java.util.concurrent.CopyOnWriteArrayList;
- import lombok.extern.slf4j.Slf4j;
- -import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- @Component
- @Slf4j
- public class TestListener {
- + public List<Integer> ids = new CopyOnWriteArrayList<>();
- + public List<Integer> simpleIds = new CopyOnWriteArrayList<>();
- @RqueueListener(value = "${rqueue.test.echo}")
- - public void echoQueue(String message) {
- + public void echoQueue(SimpleMessage message) {
- log.info("echo delayed message: {}", message);
- + if (message.isDelayed()) {
- + ids.add(message.getId());
- + } else {
- + simpleIds.add(message.getId());
- + }
- }
- Index: src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java
- IDEA additional info:
- Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
- <+>UTF-8
- ===================================================================
- --- src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java (revision 24686b1a8d61b1bf95db017bdf9c30041db1c523)
- +++ src/main/java/com/chlegou/demos/rqueue/web/rest/TestResource.java (date 1594737172811)
- @@ -1,8 +1,16 @@
- package com.chlegou.demos.rqueue.web.rest;
- +import com.chlegou.demos.rqueue.domain.messages.SimpleMessage;
- +import com.chlegou.demos.rqueue.rqueue.TestListener;
- import com.github.sonus21.rqueue.core.RqueueMessageSender;
- +import java.util.Collections;
- +import java.util.HashMap;
- +import java.util.Map;
- +import java.util.concurrent.TimeUnit;
- +import java.util.concurrent.atomic.AtomicInteger;
- import lombok.extern.slf4j.Slf4j;
- +import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.ResponseEntity;
- @@ -11,42 +19,54 @@
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- -import java.util.HashMap;
- -import java.util.Map;
- -import java.util.concurrent.TimeUnit;
- -
- @RestController
- @RequestMapping("/api/test")
- @Slf4j
- public class TestResource {
- private RqueueMessageSender rqueueMessageSender;
- + private TestListener testListener;
- +
- @Value("${rqueue.test.echo}")
- private String echoQueue;
- @Value("${rqueue.test.echo-timespan}")
- private long echoTimespan;
- + private final AtomicInteger counter = new AtomicInteger();
- - public TestResource(RqueueMessageSender rqueueMessageSender) {
- + @Autowired
- + public TestResource(RqueueMessageSender rqueueMessageSender, TestListener testListener) {
- this.rqueueMessageSender = rqueueMessageSender;
- + this.testListener = testListener;
- }
- @GetMapping("/echo/{message}")
- - public ResponseEntity<Map<String, String>> postMessage(@PathVariable String message) {
- + public ResponseEntity<Map<String, Object>> postMessage(@PathVariable String message) {
- log.info("echo instant message: {}", message);
- - Map<String, String> result = new HashMap<>();
- - result.put("message", message);
- -
- - return ResponseEntity.status(HttpStatus.OK).body(result);
- + SimpleMessage simpleMessage = new SimpleMessage(counter.incrementAndGet(), message, false);
- + rqueueMessageSender.enqueue(echoQueue, simpleMessage);
- + return ResponseEntity.status(HttpStatus.OK)
- + .body(Collections.singletonMap("message", simpleMessage));
- }
- @GetMapping("/echo/delay/{message}")
- - public ResponseEntity<Void> informNextRoll(@PathVariable String message) {
- + public ResponseEntity<Map<String, Object>> informNextRoll(@PathVariable String message) {
- log.info("queuing delayed message: {}", message);
- - rqueueMessageSender.enqueueIn(echoQueue, message, echoTimespan, TimeUnit.SECONDS);
- - return ResponseEntity.status(HttpStatus.OK).build();
- + SimpleMessage simpleMessage = new SimpleMessage(counter.incrementAndGet(), message, true);
- + rqueueMessageSender.enqueueIn(echoQueue, simpleMessage, echoTimespan, TimeUnit.SECONDS);
- + return ResponseEntity.status(HttpStatus.OK)
- + .body(Collections.singletonMap("message", simpleMessage));
- + }
- +
- + @GetMapping("/consumed-messages")
- + public ResponseEntity<Map<String, Object>> getConsumedMessage() {
- + Map<String, Object> result = new HashMap<>();
- + result.put("delayedIds", testListener.ids);
- + result.put("simpleIds", testListener.simpleIds);
- + return ResponseEntity.status(HttpStatus.OK)
- + .body(result);
- }
- }
- Index: src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java
- IDEA additional info:
- Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
- <+>UTF-8
- ===================================================================
- --- src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java (date 1594737344034)
- +++ src/main/java/com/chlegou/demos/rqueue/domain/messages/SimpleMessage.java (date 1594737344034)
- @@ -0,0 +1,19 @@
- +package com.chlegou.demos.rqueue.domain.messages;
- +
- +import lombok.AllArgsConstructor;
- +import lombok.Getter;
- +import lombok.NoArgsConstructor;
- +import lombok.Setter;
- +import lombok.ToString;
- +
- +@AllArgsConstructor
- +@NoArgsConstructor
- +@Getter
- +@Setter
- +@ToString
- +public class SimpleMessage {
- +
- + private Integer id;
- + private String message;
- + private boolean delayed;
- +}
Add Comment
Please, Sign In to add comment