Advertisement
tpeierls

Sample use of ConcurrentCacheLoader

Mar 10th, 2012
773
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 5.32 KB | None | 0 0
  1. // This is a snippet illustrating the use of ConcurrentCacheLoader. It digs
  2. // progressively deeper into the call until we get to the reason that we
  3. // want to make this a distributed operation in the first place.
  4.  
  5.     protected Map<String, AccountInfo> getAccountInfoIndex(
  6.             Multimap<String, String> accountsByServer,
  7.             Function<LegacyAccount, AccountInfo> op) {
  8.  
  9.         // We create the cache with our concurrent loader here.
  10.  
  11.         LoadingCache<String, AccountInfo> cache = CacheBuilder.newBuilder().build(
  12.             ConcurrentCacheLoader.<String, AccountInfo>newBuilder()
  13.                 .using(hazelcast().getExecutorService())
  14.                 .timeLimit(30L, TimeUnit.SECONDS)
  15.                 .byKeyFunction(new Function<String, Object>() {
  16.                     public Object apply(String account) {
  17.                         return accountToServer(account).or(account);
  18.                     }
  19.                 })
  20.                 .build(new Fun(op))
  21.         );
  22.  
  23.         // We get a list of account names here. Nothing special about this.
  24.  
  25.         List<String> accounts = Lists.newArrayListWithCapacity(accountsByServer.size());
  26.         for (String server : accountsByServer.keySet()) {
  27.             if (serverToContainer(server).isPresent()) {
  28.                 accounts.addAll(accountsByServer.get(server));
  29.             }
  30.         }
  31.  
  32.         try {
  33.  
  34.             // We get all the values for accounts here. This turns into
  35.             // a bunch of DistributedTask calls.
  36.  
  37.             return cache.getAll(accounts);
  38.  
  39.         } catch (ExecutionException ex) {
  40.  
  41.             // In this case, we swallow the execution exception
  42.             // and just report that we didn't get any results.
  43.  
  44.             return ImmutableMap.of();
  45.         }
  46.     }
  47.  
  48.     // This is the Function we're using to convert account names into
  49.     // AccountInfo objects. It converts an account name to a LegacyAccount,
  50.     // whatever that is, and then calls another Function, below to turn
  51.     // the LegacyAccount into an AccountInfo.
  52.  
  53.     static class Fun implements Function<String, AccountInfo>, java.io.Serializable {
  54.  
  55.         public AccountInfo apply(String accountName) {
  56.             String containerName =
  57.                 serverToContainer(accountToServer(accountName).get()).get();
  58.             LegacyContainer container = new LegacyContainer(blobStore, containerName);
  59.             Optional<LegacyAccount> account = container.getAccount(accountName);
  60.             return account.isPresent() ? op.apply(account.get()) : null;
  61.         }
  62.  
  63.         Fun(Function<LegacyAccount, AccountInfo> op) {
  64.             checkArgument(op instanceof java.io.Serializable,
  65.                  "Function must be serializable");
  66.             this.op = op;
  67.         }
  68.  
  69.         private final Function<LegacyAccount, AccountInfo> op;
  70.     }
  71.  
  72.  
  73.     // This is the Function passed to Fun, above. The real work
  74.     // is done by getWebstoreProperties.
  75.  
  76.     static class ProcessAccount implements Function<LegacyAccount, AccountInfo>, java.io.Serializable {
  77.         public AccountInfo apply(LegacyAccount account) {
  78.             String accountName = account.account();
  79.             AccountInfo accountInfo = new AccountInfo(accountName, accountToServer(accountName).get());
  80.             accountInfo.webstoreProperties = getWebstoreProperties(account);
  81.             return accountInfo;
  82.         }
  83.     }
  84.  
  85.     // This is the real work of ProcessAccount, and the most
  86.     // expensive part of it is getting the document in the
  87.     // first line.
  88.  
  89.     protected static Map<String, String> getWebstoreProperties(LegacyAccount account) {
  90.         Map<String, String> result = Maps.newHashMap();
  91.  
  92.         Optional<Document> wsProps = account.getDocument(WEBSTORE_PROPS);
  93.         if (wsProps.isPresent()) {
  94.             try {
  95.                 for (Node node : PROP_XPATH.evaluate(wsProps.get())) {
  96.                     String name = NAME_XPATH.evaluateAsString(node);
  97.                     String value = VALUE_XPATH.evaluateAsString(node);
  98.                     result.put(name, value);
  99.                 }
  100.             } catch (RuntimeException ex) {
  101.                 logger.info("Trouble reading properties for account {}", account.account());
  102.             }
  103.         }
  104.  
  105.         return ImmutableSortedMap.copyOf(result);
  106.     }
  107.  
  108.     // Ultimately, the call to getDocument turns into this code.
  109.     // It's that call to BlobStore.getBlob that we want to distribute
  110.     // over all the nodes in the cluster.
  111.  
  112.     Optional<Document> getDocument(String name, Charset charset) {
  113.         try {
  114.             Blob blob = blobStore().getBlob(this.container, name);
  115.             if (blob == null) {
  116.                 logger.info("No such element: {}/{}", this.container, name);
  117.                 return Optional.absent();
  118.             }
  119.             InputSource in = new InputSource(skipHeader(blob.getPayload().getInput()));
  120.             in.setEncoding(charset.name());
  121.             return Optional.of(ParseUtil.parse(in));
  122.         } catch (IOException ex) {
  123.             logger.warn(String.format("Can't read: %s/%s", container, name), ex);
  124.             return Optional.absent();
  125.         } catch (RuntimeException ex) {         // NOPMD
  126.             logger.warn(String.format("Unexpected runtime exception for %s/%s", container, name), ex);
  127.             return Optional.absent();
  128.         }
  129.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement