Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Queue;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.Phaser;
- import java.util.concurrent.StructuredTaskScope;
- record PageCrawlResult(String url, int linkCount) {
- }
- class FinalAggregatingJoiner implements
- StructuredTaskScope.Joiner<PageCrawlResult, List<PageCrawlResult>> {
- private final Phaser phaser = new Phaser(1); // Main thread is initial party
- private final Queue<PageCrawlResult> allResults = new ConcurrentLinkedQueue<>();
- private final Queue<String> pendingUrls = new ConcurrentLinkedQueue<>();
- public Phaser getPhaser() {
- return phaser;
- }
- public String pollUrlToCrawl() {
- return pendingUrls.poll();
- }
- public void submitNewUrl(String url) {
- pendingUrls.add(url);
- }
- public boolean isWorkOngoing() {
- return !pendingUrls.isEmpty() || phaser.getRegisteredParties() > 1;
- }
- public void waitForActiveTasks() {
- if (phaser.getRegisteredParties() > 1) {
- phaser.arriveAndAwaitAdvance();
- }
- }
- @Override
- public boolean onComplete(StructuredTaskScope.Subtask<? extends PageCrawlResult> subtask) {
- try {
- if (StructuredTaskScope.Subtask.State.SUCCESS.equals(subtask.state())) {
- allResults.add(subtask.get());
- }
- } finally {
- phaser.arriveAndDeregister();
- }
- return false;
- }
- @Override
- public List<PageCrawlResult> result() {
- phaser.arriveAndDeregister();
- return allResults.stream().toList();
- }
- }
- public class PhaserCrawler {
- // Worker only needs the joiner for submitting new work.
- PageCrawlResult doCrawl(String url, FinalAggregatingJoiner joiner) {
- int linksFound;
- // Simulate fetching URL, extraction, and submission
- if (url.equals("rootUrl")) {
- joiner.submitNewUrl("url1");
- joiner.submitNewUrl("url2");
- linksFound = 2;
- } else if (url.equals("url1")) {
- joiner.submitNewUrl("url3");
- linksFound = 1;
- } else {
- linksFound = 0;
- }
- return new PageCrawlResult(url, linksFound);
- }
- public List<PageCrawlResult> crawl(String rootUrl) {
- var visited = Collections.synchronizedSet(new HashSet<String>());
- var joiner = new FinalAggregatingJoiner();
- joiner.submitNewUrl(rootUrl);
- try (var scope = StructuredTaskScope.open(joiner)) {
- Phaser phaser = joiner.getPhaser();
- while (joiner.isWorkOngoing()) {
- String url = joiner.pollUrlToCrawl();
- if (url != null) {
- if (visited.add(url)) {
- phaser.register();
- scope.fork(() -> doCrawl(url, joiner));
- }
- }
- if (url == null) {
- joiner.waitForActiveTasks();
- }
- }
- return scope.join();
- } catch (Exception e) {
- throw new RuntimeException("Crawl failed", e);
- }
- }
- static void main() throws InterruptedException {
- PhaserCrawler crawler = new PhaserCrawler();
- List<PageCrawlResult> results = crawler.crawl("rootUrl");
- IO.println("Crawl complete. Total pages crawled: " + results.size());
- results.forEach(IO::println);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment