Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // This is a snippet illustrating the use of ConcurrentCacheLoader. It digs
- // progressively deeper into the call until we get to the reason that we
- // want to make this a distributed operation in the first place.
- protected Map<String, AccountInfo> getAccountInfoIndex(
- Multimap<String, String> accountsByServer,
- Function<LegacyAccount, AccountInfo> op) {
- // We create the cache with our concurrent loader here.
- LoadingCache<String, AccountInfo> cache = CacheBuilder.newBuilder().build(
- ConcurrentCacheLoader.<String, AccountInfo>newBuilder()
- .using(hazelcast().getExecutorService())
- .timeLimit(30L, TimeUnit.SECONDS)
- .byKeyFunction(new Function<String, Object>() {
- public Object apply(String account) {
- return accountToServer(account).or(account);
- }
- })
- .build(new Fun(op))
- );
- // We get a list of account names here. Nothing special about this.
- List<String> accounts = Lists.newArrayListWithCapacity(accountsByServer.size());
- for (String server : accountsByServer.keySet()) {
- if (serverToContainer(server).isPresent()) {
- accounts.addAll(accountsByServer.get(server));
- }
- }
- try {
- // We get all the values for accounts here. This turns into
- // a bunch of DistributedTask calls.
- return cache.getAll(accounts);
- } catch (ExecutionException ex) {
- // In this case, we swallow the execution exception
- // and just report that we didn't get any results.
- return ImmutableMap.of();
- }
- }
- // This is the Function we're using to convert account names into
- // AccountInfo objects. It converts an account name to a LegacyAccount,
- // whatever that is, and then calls another Function, below to turn
- // the LegacyAccount into an AccountInfo.
- static class Fun implements Function<String, AccountInfo>, java.io.Serializable {
- public AccountInfo apply(String accountName) {
- String containerName =
- serverToContainer(accountToServer(accountName).get()).get();
- LegacyContainer container = new LegacyContainer(blobStore, containerName);
- Optional<LegacyAccount> account = container.getAccount(accountName);
- return account.isPresent() ? op.apply(account.get()) : null;
- }
- Fun(Function<LegacyAccount, AccountInfo> op) {
- checkArgument(op instanceof java.io.Serializable,
- "Function must be serializable");
- this.op = op;
- }
- private final Function<LegacyAccount, AccountInfo> op;
- }
- // This is the Function passed to Fun, above. The real work
- // is done by getWebstoreProperties.
- static class ProcessAccount implements Function<LegacyAccount, AccountInfo>, java.io.Serializable {
- public AccountInfo apply(LegacyAccount account) {
- String accountName = account.account();
- AccountInfo accountInfo = new AccountInfo(accountName, accountToServer(accountName).get());
- accountInfo.webstoreProperties = getWebstoreProperties(account);
- return accountInfo;
- }
- }
- // This is the real work of ProcessAccount, and the most
- // expensive part of it is getting the document in the
- // first line.
- protected static Map<String, String> getWebstoreProperties(LegacyAccount account) {
- Map<String, String> result = Maps.newHashMap();
- Optional<Document> wsProps = account.getDocument(WEBSTORE_PROPS);
- if (wsProps.isPresent()) {
- try {
- for (Node node : PROP_XPATH.evaluate(wsProps.get())) {
- String name = NAME_XPATH.evaluateAsString(node);
- String value = VALUE_XPATH.evaluateAsString(node);
- result.put(name, value);
- }
- } catch (RuntimeException ex) {
- logger.info("Trouble reading properties for account {}", account.account());
- }
- }
- return ImmutableSortedMap.copyOf(result);
- }
- // Ultimately, the call to getDocument turns into this code.
- // It's that call to BlobStore.getBlob that we want to distribute
- // over all the nodes in the cluster.
- Optional<Document> getDocument(String name, Charset charset) {
- try {
- Blob blob = blobStore().getBlob(this.container, name);
- if (blob == null) {
- logger.info("No such element: {}/{}", this.container, name);
- return Optional.absent();
- }
- InputSource in = new InputSource(skipHeader(blob.getPayload().getInput()));
- in.setEncoding(charset.name());
- return Optional.of(ParseUtil.parse(in));
- } catch (IOException ex) {
- logger.warn(String.format("Can't read: %s/%s", container, name), ex);
- return Optional.absent();
- } catch (RuntimeException ex) { // NOPMD
- logger.warn(String.format("Unexpected runtime exception for %s/%s", container, name), ex);
- return Optional.absent();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement