Guest User

Untitled

a guest
Mar 24th, 2018
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.32 KB | None | 0 0
  1. @Test
  2. public void testDelayedExecutionSequence() {
  3. final Queue<List<Integer>> inQueue = new LinkedBlockingQueue<>();
  4. MessageSource<List<Integer>> inMs = new AbstractMessageSource<List<Integer>>() {
  5. @Override
  6. public String getComponentType() {
  7. return null;
  8. }
  9.  
  10. @Override
  11. protected Object doReceive() {
  12. return inQueue.poll();
  13. }
  14. };
  15.  
  16. final int messagesPerStep = 100;
  17. final int maxIterations = 10;
  18. for(int iteration = 1, from=1, to=messagesPerStep; iteration <= maxIterations; iteration++, from += messagesPerStep, to += messagesPerStep) {
  19. System.out.println(String.format("add list from=%d, to=%d", from, to));
  20. inQueue.add(IntStream.rangeClosed(from, to).boxed().collect(Collectors.toList()));
  21. }
  22.  
  23. final AtomicInteger amqpSendCounter = new AtomicInteger();
  24. final AtomicInteger iterationCounter = new AtomicInteger();
  25. final List<Integer> resultSequence = new ArrayList();
  26.  
  27. IntegrationFlow integrationFlow = IntegrationFlows
  28. .from(inMs, c->c.poller(Pollers.fixedDelay(50).maxMessagesPerPoll(1)))
  29. .split()
  30. .channel(c -> c.executor(Executors.newFixedThreadPool(10)))
  31. .<Integer, Integer>transform(p -> {
  32. if(p == 405) {
  33. try {
  34. Thread.currentThread().sleep(1000);
  35. } catch (InterruptedException e) {
  36. }
  37. }
  38. return p;})
  39. .handle((p, h) -> {amqpSendCounter.incrementAndGet(); return p;})
  40. .aggregate()
  41. .log(l -> "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! after agregate(): "+l)
  42. .<List<Integer>>handle(m -> {iterationCounter.incrementAndGet();
  43. Integer firstIntInList = ((List<Integer>)m.getPayload()).get(0);
  44. resultSequence.add(firstIntInList / 100);
  45. })
  46. .get();
  47.  
  48. IntegrationFlowRegistration registration = this.flowContext.registration(integrationFlow).register();
  49.  
  50. try {
  51. Thread.currentThread().sleep(3000);
  52. } catch (InterruptedException e) {
  53. }
  54.  
  55. assertThat(iterationCounter.get()).as("iterationCounter.get").isEqualTo(maxIterations);
  56. assertThat(resultSequence).as("result sequence").isEqualTo(IntStream.rangeClosed(0, 9).boxed().collect(Collectors.toList()));
  57. }
Add Comment
Please, Sign In to add comment