Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example.hazelcast.mapstore;
- import com.hazelcast.core.*;
- import com.hazelcast.config.*;
- import java.util.*;
- import java.util.concurrent.*;
- import static java.util.stream.Collectors.*;
- /**
- * Minimal app to see if map loader/store in wildcard map config
- * is properly set up in all nodes.
- */
- public class MapStoreExample {
- /**
- * Just enough to see whether the right load methods are called.
- */
- static class TinyMapLoader implements MapLoader<String, String> {
- final String name;
- TinyMapLoader(String name, Properties props) {
- this.name = name;
- }
- public String load(String key) {
- return name + "-" + key;
- }
- public Map<String, String> loadAll(Collection<String> keys) {
- return keys.stream().collect(toMap(k -> k, k -> load(k)));
- }
- public Set<String> loadAllKeys() {
- return new HashSet<>(Arrays.asList("a", "b", "c"));
- }
- }
- final String instanceName;
- final int minNodes;
- volatile HazelcastInstance hz = null;
- volatile boolean initiator = false;
- MapStoreExample(String instanceName, int minNodes) {
- this.instanceName = instanceName;
- this.minNodes = minNodes;
- }
- public void whenEnoughNodes(Runnable task) throws InterruptedException {
- Config config = new Config(instanceName);
- config.getMapConfig("map-*").setMapStoreConfig(
- new MapStoreConfig()
- .setFactoryImplementation((MapStoreFactory) TinyMapLoader::new)
- );
- hz = Hazelcast.newHazelcastInstance(config);
- ITopic<Void> shutdownTopic = hz.getTopic("shutdown");
- shutdownTopic.addMessageListener(m -> Hazelcast.shutdownAll());
- ITopic<Void> taskTopic = hz.getTopic("task");
- taskTopic.addMessageListener(m -> {
- task.run();
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (initiator) {
- shutdownTopic.publish(null);
- }
- });
- hz.getCluster().addMembershipListener(new InitialMembershipListener() {
- public void init(InitialMembershipEvent event) {
- if (event.getMembers().size() >= minNodes) {
- System.out.printf("****** %s is initiator ******%n", instanceName);
- initiator = true;
- taskTopic.publish(null);
- }
- }
- public void memberAdded(MembershipEvent event) {}
- public void memberAttributeChanged(MemberAttributeEvent event) {}
- public void memberRemoved(MembershipEvent event) {}
- });
- }
- public void run() {
- dumpMap("map-rst");
- dumpMap("map-uvw");
- dumpMap("map-xyz");
- }
- void dumpMap(String mapName) {
- IMap<String, String> map = hz.getMap(mapName);
- for (String key : map.keySet()) {
- String value = map.get(key);
- System.out.printf(
- "****** %s: %s.get(%s) -> %s ******%n",
- instanceName, mapName, key, value);
- }
- }
- public static void main(String... args) throws NumberFormatException, InterruptedException {
- MapStoreExample mse = new MapStoreExample(args[0], Integer.parseInt(args[1]));
- mse.whenEnoughNodes(() -> mse.run());
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement