Guest User

PhaserStructuredConcurrency

a guest
Sep 26th, 2025
39
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.18 KB | Source Code | 0 0
  1. import java.util.Collections;
  2. import java.util.HashSet;
  3. import java.util.List;
  4. import java.util.Queue;
  5. import java.util.concurrent.ConcurrentLinkedQueue;
  6. import java.util.concurrent.Phaser;
  7. import java.util.concurrent.StructuredTaskScope;
  8.  
  9. record PageCrawlResult(String url, int linkCount) {
  10.  
  11. }
  12.  
  13. class FinalAggregatingJoiner implements
  14.     StructuredTaskScope.Joiner<PageCrawlResult, List<PageCrawlResult>> {
  15.  
  16.   private final Phaser phaser = new Phaser(1); // Main thread is initial party
  17.   private final Queue<PageCrawlResult> allResults = new ConcurrentLinkedQueue<>();
  18.   private final Queue<String> pendingUrls = new ConcurrentLinkedQueue<>();
  19.  
  20.   public Phaser getPhaser() {
  21.     return phaser;
  22.   }
  23.  
  24.   public String pollUrlToCrawl() {
  25.     return pendingUrls.poll();
  26.   }
  27.  
  28.   public void submitNewUrl(String url) {
  29.     pendingUrls.add(url);
  30.   }
  31.  
  32.   public boolean isWorkOngoing() {
  33.     return !pendingUrls.isEmpty() || phaser.getRegisteredParties() > 1;
  34.   }
  35.  
  36.   public void waitForActiveTasks() {
  37.     if (phaser.getRegisteredParties() > 1) {
  38.       phaser.arriveAndAwaitAdvance();
  39.     }
  40.   }
  41.  
  42.   @Override
  43.   public boolean onComplete(StructuredTaskScope.Subtask<? extends PageCrawlResult> subtask) {
  44.     try {
  45.       if (StructuredTaskScope.Subtask.State.SUCCESS.equals(subtask.state())) {
  46.         allResults.add(subtask.get());
  47.       }
  48.     } finally {
  49.       phaser.arriveAndDeregister();
  50.     }
  51.     return false;
  52.   }
  53.  
  54.   @Override
  55.   public List<PageCrawlResult> result() {
  56.     phaser.arriveAndDeregister();
  57.     return allResults.stream().toList();
  58.   }
  59. }
  60.  
  61. public class PhaserCrawler {
  62.  
  63.   // Worker only needs the joiner for submitting new work.
  64.   PageCrawlResult doCrawl(String url, FinalAggregatingJoiner joiner) {
  65.  
  66.     int linksFound;
  67.  
  68.     // Simulate fetching URL, extraction, and submission
  69.     if (url.equals("rootUrl")) {
  70.       joiner.submitNewUrl("url1");
  71.       joiner.submitNewUrl("url2");
  72.       linksFound = 2;
  73.     } else if (url.equals("url1")) {
  74.       joiner.submitNewUrl("url3");
  75.       linksFound = 1;
  76.     } else {
  77.       linksFound = 0;
  78.     }
  79.  
  80.     return new PageCrawlResult(url, linksFound);
  81.   }
  82.  
  83.   public List<PageCrawlResult> crawl(String rootUrl) {
  84.  
  85.     var visited = Collections.synchronizedSet(new HashSet<String>());
  86.     var joiner = new FinalAggregatingJoiner();
  87.     joiner.submitNewUrl(rootUrl);
  88.     try (var scope = StructuredTaskScope.open(joiner)) {
  89.       Phaser phaser = joiner.getPhaser();
  90.  
  91.       while (joiner.isWorkOngoing()) {
  92.         String url = joiner.pollUrlToCrawl();
  93.         if (url != null) {
  94.           if (visited.add(url)) {
  95.             phaser.register();
  96.             scope.fork(() -> doCrawl(url, joiner));
  97.           }
  98.         }
  99.  
  100.         if (url == null) {
  101.           joiner.waitForActiveTasks();
  102.         }
  103.       }
  104.       return scope.join();
  105.  
  106.     } catch (Exception e) {
  107.       throw new RuntimeException("Crawl failed", e);
  108.     }
  109.   }
  110.  
  111.   static void main() throws InterruptedException {
  112.     PhaserCrawler crawler = new PhaserCrawler();
  113.     List<PageCrawlResult> results = crawler.crawl("rootUrl");
  114.     IO.println("Crawl complete. Total pages crawled: " + results.size());
  115.     results.forEach(IO::println);
  116.   }
  117. }
Advertisement
Add Comment
Please, Sign In to add comment