Guest User

Untitled

a guest
Jul 20th, 2018
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.45 KB | None | 0 0
  1. package io.crazy88.beatrix.boot.akka.spring;
  2.  
  3. import static java.lang.Integer.MAX_VALUE;
  4. import static java.util.Optional.ofNullable;
  5. import static java.util.stream.Collectors.toSet;
  6. import static java.util.stream.StreamSupport.stream;
  7.  
  8. import java.util.Collections;
  9. import java.util.HashSet;
  10. import java.util.Optional;
  11. import java.util.Set;
  12. import java.util.concurrent.atomic.AtomicBoolean;
  13. import java.util.function.Predicate;
  14.  
  15. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  16. import org.springframework.context.SmartLifecycle;
  17. import org.springframework.context.event.ContextClosedEvent;
  18. import org.springframework.context.event.EventListener;
  19. import org.springframework.core.Ordered;
  20. import org.springframework.stereotype.Component;
  21.  
  22. import akka.actor.ActorRef;
  23. import akka.actor.Props;
  24. import akka.cluster.Cluster;
  25. import akka.cluster.Member;
  26. import akka.cluster.MemberStatus;
  27. import akka.cluster.sharding.ClusterSharding;
  28. import akka.cluster.sharding.ClusterShardingSettings;
  29. import akka.cluster.sharding.ShardRegion;
  30. import akka.management.cluster.bootstrap.ClusterBootstrap;
  31.  
  32. @Component
  33. @ConditionalOnProperty("akka.cluster.enabled")
  34. public class ActorCluster implements SmartLifecycle, Ordered {
  35.  
  36. private final AtomicBoolean running = new AtomicBoolean(false);
  37.  
  38. private final ActorSystem system;
  39.  
  40. private final Set<Runnable> onUpCallbacks = new HashSet<>();
  41.  
  42. private final Set<Runnable> onDownCallbacks = new HashSet<>();
  43.  
  44. private Cluster cluster;
  45.  
  46. private ClusterBootstrap bootstrap;
  47.  
  48. private ClusterSharding sharding;
  49.  
  50. private ClusterShardingSettings settings;
  51.  
  52. public ActorCluster(ActorSystem system) {
  53. this.system = system;
  54. start();
  55. }
  56.  
  57. @Override
  58. public void start() {
  59. if (!isRunning()) {
  60. cluster = system.apply(Cluster::get);
  61. bootstrap = system.apply(ClusterBootstrap::get);
  62. sharding = system.apply(ClusterSharding::get);
  63. settings = system.apply(ClusterShardingSettings::create);
  64. bootstrap.start();
  65. onUpCallbacks.forEach(cluster::registerOnMemberUp);
  66. onDownCallbacks.forEach(cluster::registerOnMemberRemoved);
  67. running.set(true);
  68. }
  69. }
  70.  
  71. @Override
  72. public void stop(Runnable callback) {
  73. stop();
  74. callback.run();
  75. }
  76.  
  77. @Override
  78. public void stop() {
  79. if (isRunning()) {
  80. running.set(false);
  81. cluster = null;
  82. bootstrap = null;
  83. sharding = null;
  84. settings = null;
  85. }
  86. }
  87.  
  88. @Override
  89. public boolean isAutoStartup() {
  90. return true;
  91. }
  92.  
  93. @Override
  94. public boolean isRunning() {
  95. return running.get();
  96. }
  97.  
  98. @Override
  99. public int getPhase() {
  100. // Earlier than InputBindingLifecycle
  101. return MAX_VALUE - 2000 - getOrder();
  102. }
  103.  
  104. @Override
  105. public int getOrder() {
  106. return HIGHEST_PRECEDENCE + 1;
  107. }
  108.  
  109. public void registerOnMemberUp(Runnable callback) {
  110. onUpCallbacks.add(callback);
  111. if (running.get()) {
  112. cluster.registerOnMemberUp(callback);
  113. }
  114. }
  115.  
  116. public void registerOnMemberDown(Runnable callback) {
  117. onDownCallbacks.add(callback);
  118. if (running.get()) {
  119. cluster.registerOnMemberRemoved(callback);
  120. }
  121. }
  122.  
  123. public ActorRef startShardRegion(Class<?> actorClass, ShardRegion.MessageExtractor messageExtractor, Object... args) {
  124. return sharding.start(actorClass.getSimpleName(), system.props(actorClass, args), settings, messageExtractor);
  125. }
  126.  
  127. public ActorRef startShardRegion(Class<?> actorClass, ShardRegion.MessageExtractor messageExtractor, Props props) {
  128. return sharding.start(actorClass.getSimpleName(), props, settings, messageExtractor);
  129. }
  130.  
  131. public ActorRef getShardRegion(Class<?> actorClass) {
  132. return sharding.shardRegion(actorClass.getSimpleName());
  133. }
  134.  
  135. public Optional<ClusterState> getState() {
  136. return ofNullable(cluster)
  137. .map(Cluster::state)
  138. .map(ClusterState::from);
  139. }
  140.  
  141. public boolean isSelfUp() {
  142. return iterableToSet(cluster.state().getMembers()).stream()
  143. .filter(it -> it.address().equals(cluster.selfAddress()))
  144. .anyMatch(it -> it.status().equals(MemberStatus.up()));
  145. }
  146.  
  147. public void forceMemberDown(Set<String> addresses, Set<String> statuses) {
  148. forceMemberDown(toPredicate(addresses, statuses));
  149. }
  150.  
  151. private Predicate<Member> toPredicate(Set<String> addresses, Set<String> statuses) {
  152. return member ->
  153. (statuses.isEmpty() || statuses.contains(member.status().toString()))
  154. && (addresses.isEmpty() || addresses.contains(member.address().toString()));
  155. }
  156.  
  157. private void forceMemberDown(Predicate<Member> memberPredicate) {
  158. if (isRunning()) {
  159. iterableToSet(cluster.state().getMembers()).stream()
  160. .filter(memberPredicate)
  161. .forEach(member -> {
  162. cluster.leave(member.address());
  163. cluster.down(member.address());
  164. });
  165. }
  166. }
  167.  
  168. private static <T> Set<T> iterableToSet(Iterable<T> iterable) {
  169. return stream(ofNullable(iterable).orElseGet(Collections::emptyList).spliterator(), false)
  170. .collect(toSet());
  171. }
  172.  
  173. @EventListener(ContextClosedEvent.class)
  174. public void onApplicationEvent(ContextClosedEvent event) {
  175. stop();
  176. }
  177. }
Add Comment
Please, Sign In to add comment