Advertisement
Brord

pipeline

Jun 6th, 2019
272
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.87 KB | None | 0 0
  1. public void start(){
  2. addStage("broadcast", broadcastStageQueue, broadcastStage);
  3. }
  4.  
  5. private <T> void addStage(String name, BlockingQueue<ProcessingContext<T>> queue, ProcessingStage stage) {
  6. stagesThreadPool.submit(new Thread(() -> {
  7. try {
  8. while (!shutdown.get()) {
  9. ProcessingContext ctx = queue.take();
  10. ctx = stage.process(ctx);
  11. switch (ctx.getNextStage()) {
  12. case REPLY:
  13. replyStageQueue.put(ctx);
  14. break;
  15. case HASHING:
  16. hashAndValidate(ctx);
  17. break;
  18. case RECEIVED:
  19. receivedStageQueue.put(ctx);
  20. break;
  21. case MULTIPLE:
  22. ImmutablePair<ProcessingContext<ReplyPayload>,
  23. ProcessingContext<
  24. ReceivedPayload>> payload = (ImmutablePair<ProcessingContext<ReplyPayload>,
  25. ProcessingContext<ReceivedPayload>>) ctx.getPayload();
  26. replyStageQueue.put(payload.getLeft());
  27. receivedStageQueue.put(payload.getRight());
  28. break;
  29. case BROADCAST:
  30. broadcastStageQueue.put(ctx);
  31. break;
  32. case ABORT:
  33. break;
  34. }
  35. }
  36. } catch (InterruptedException e) {
  37.  
  38. } finally {
  39. log.info("{} stage shutdown", name);
  40. }
  41. }, name + "-stage"));
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement