Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Test
- public void testDelayedExecutionSequence() {
- final Queue<List<Integer>> inQueue = new LinkedBlockingQueue<>();
- MessageSource<List<Integer>> inMs = new AbstractMessageSource<List<Integer>>() {
- @Override
- public String getComponentType() {
- return null;
- }
- @Override
- protected Object doReceive() {
- return inQueue.poll();
- }
- };
- final int messagesPerStep = 100;
- final int maxIterations = 10;
- for(int iteration = 1, from=1, to=messagesPerStep; iteration <= maxIterations; iteration++, from += messagesPerStep, to += messagesPerStep) {
- System.out.println(String.format("add list from=%d, to=%d", from, to));
- inQueue.add(IntStream.rangeClosed(from, to).boxed().collect(Collectors.toList()));
- }
- final AtomicInteger amqpSendCounter = new AtomicInteger();
- final AtomicInteger iterationCounter = new AtomicInteger();
- final List<Integer> resultSequence = new ArrayList();
- IntegrationFlow integrationFlow = IntegrationFlows
- .from(inMs, c->c.poller(Pollers.fixedDelay(50).maxMessagesPerPoll(1)))
- .split()
- .channel(c -> c.executor(Executors.newFixedThreadPool(10)))
- .<Integer, Integer>transform(p -> {
- if(p == 405) {
- try {
- Thread.currentThread().sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- return p;})
- .handle((p, h) -> {amqpSendCounter.incrementAndGet(); return p;})
- .aggregate()
- .log(l -> "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! after agregate(): "+l)
- .<List<Integer>>handle(m -> {iterationCounter.incrementAndGet();
- Integer firstIntInList = ((List<Integer>)m.getPayload()).get(0);
- resultSequence.add(firstIntInList / 100);
- })
- .get();
- IntegrationFlowRegistration registration = this.flowContext.registration(integrationFlow).register();
- try {
- Thread.currentThread().sleep(3000);
- } catch (InterruptedException e) {
- }
- assertThat(iterationCounter.get()).as("iterationCounter.get").isEqualTo(maxIterations);
- assertThat(resultSequence).as("result sequence").isEqualTo(IntStream.rangeClosed(0, 9).boxed().collect(Collectors.toList()));
- }
Add Comment
Please, Sign In to add comment