Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public void start(){
- addStage("broadcast", broadcastStageQueue, broadcastStage);
- }
- private <T> void addStage(String name, BlockingQueue<ProcessingContext<T>> queue, ProcessingStage stage) {
- stagesThreadPool.submit(new Thread(() -> {
- try {
- while (!shutdown.get()) {
- ProcessingContext ctx = queue.take();
- ctx = stage.process(ctx);
- switch (ctx.getNextStage()) {
- case REPLY:
- replyStageQueue.put(ctx);
- break;
- case HASHING:
- hashAndValidate(ctx);
- break;
- case RECEIVED:
- receivedStageQueue.put(ctx);
- break;
- case MULTIPLE:
- ImmutablePair<ProcessingContext<ReplyPayload>,
- ProcessingContext<
- ReceivedPayload>> payload = (ImmutablePair<ProcessingContext<ReplyPayload>,
- ProcessingContext<ReceivedPayload>>) ctx.getPayload();
- replyStageQueue.put(payload.getLeft());
- receivedStageQueue.put(payload.getRight());
- break;
- case BROADCAST:
- broadcastStageQueue.put(ctx);
- break;
- case ABORT:
- break;
- }
- }
- } catch (InterruptedException e) {
- } finally {
- log.info("{} stage shutdown", name);
- }
- }, name + "-stage"));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement