Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Future<Object> future = Patterns.ask(cacheActor, getMessage, Config.TIMEOUT_MILLIS);
- CompletionStage<Object> stage = FutureConverters.toJava(future);
- stage.thenCompose(resultMessage -> {
- System.out.println("ResultMessage " + ((ResultMessage) resultMessage).getURL() + " " + ((ResultMessage) resultMessage).getResponseTime() + " " + ((ResultMessage) resultMessage).isSuccess());
- if (((ResultMessage) resultMessage).isSuccess()) {
- return CompletableFuture.completedFuture(resultMessage);
- } else {
- return Source.from(Collections.singletonList(getMessage))
- .toMat(
- Flow.<GetMessage>create()
- .mapConcat(message -> Collections.nCopies(message.getCount(), message.getURL()))
- .mapAsync(1, URL -> {
- long millisNow = System.currentTimeMillis();
- return asyncHttpClient
- .prepareGet(URL)
- .execute()
- .toCompletableFuture()
- .thenCompose(f ->
- CompletableFuture.completedFuture(System.currentTimeMillis() - millisNow));
- })
- .toMat(Sink.fold(0L, Long::sum), Keep.right()),
- Keep.right()
- )
- .run(materializer)
- .thenCompose(time ->
- CompletableFuture.completedFuture(new ResultMessage(
- getMessage.getURL(),
- (time / getMessage.getCount()),
- false
- )));
- }
- }
- );
- return stage;
- })
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement