Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package io.crazy88.beatrix.boot.akka.spring;
- import static java.lang.Integer.MAX_VALUE;
- import static java.util.Optional.ofNullable;
- import static java.util.stream.Collectors.toSet;
- import static java.util.stream.StreamSupport.stream;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.Optional;
- import java.util.Set;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.function.Predicate;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.context.SmartLifecycle;
- import org.springframework.context.event.ContextClosedEvent;
- import org.springframework.context.event.EventListener;
- import org.springframework.core.Ordered;
- import org.springframework.stereotype.Component;
- import akka.actor.ActorRef;
- import akka.actor.Props;
- import akka.cluster.Cluster;
- import akka.cluster.Member;
- import akka.cluster.MemberStatus;
- import akka.cluster.sharding.ClusterSharding;
- import akka.cluster.sharding.ClusterShardingSettings;
- import akka.cluster.sharding.ShardRegion;
- import akka.management.cluster.bootstrap.ClusterBootstrap;
- @Component
- @ConditionalOnProperty("akka.cluster.enabled")
- public class ActorCluster implements SmartLifecycle, Ordered {
- private final AtomicBoolean running = new AtomicBoolean(false);
- private final ActorSystem system;
- private final Set<Runnable> onUpCallbacks = new HashSet<>();
- private final Set<Runnable> onDownCallbacks = new HashSet<>();
- private Cluster cluster;
- private ClusterBootstrap bootstrap;
- private ClusterSharding sharding;
- private ClusterShardingSettings settings;
- public ActorCluster(ActorSystem system) {
- this.system = system;
- start();
- }
- @Override
- public void start() {
- if (!isRunning()) {
- cluster = system.apply(Cluster::get);
- bootstrap = system.apply(ClusterBootstrap::get);
- sharding = system.apply(ClusterSharding::get);
- settings = system.apply(ClusterShardingSettings::create);
- bootstrap.start();
- onUpCallbacks.forEach(cluster::registerOnMemberUp);
- onDownCallbacks.forEach(cluster::registerOnMemberRemoved);
- running.set(true);
- }
- }
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
- @Override
- public void stop() {
- if (isRunning()) {
- running.set(false);
- cluster = null;
- bootstrap = null;
- sharding = null;
- settings = null;
- }
- }
- @Override
- public boolean isAutoStartup() {
- return true;
- }
- @Override
- public boolean isRunning() {
- return running.get();
- }
- @Override
- public int getPhase() {
- // Earlier than InputBindingLifecycle
- return MAX_VALUE - 2000 - getOrder();
- }
- @Override
- public int getOrder() {
- return HIGHEST_PRECEDENCE + 1;
- }
- public void registerOnMemberUp(Runnable callback) {
- onUpCallbacks.add(callback);
- if (running.get()) {
- cluster.registerOnMemberUp(callback);
- }
- }
- public void registerOnMemberDown(Runnable callback) {
- onDownCallbacks.add(callback);
- if (running.get()) {
- cluster.registerOnMemberRemoved(callback);
- }
- }
- public ActorRef startShardRegion(Class<?> actorClass, ShardRegion.MessageExtractor messageExtractor, Object... args) {
- return sharding.start(actorClass.getSimpleName(), system.props(actorClass, args), settings, messageExtractor);
- }
- public ActorRef startShardRegion(Class<?> actorClass, ShardRegion.MessageExtractor messageExtractor, Props props) {
- return sharding.start(actorClass.getSimpleName(), props, settings, messageExtractor);
- }
- public ActorRef getShardRegion(Class<?> actorClass) {
- return sharding.shardRegion(actorClass.getSimpleName());
- }
- public Optional<ClusterState> getState() {
- return ofNullable(cluster)
- .map(Cluster::state)
- .map(ClusterState::from);
- }
- public boolean isSelfUp() {
- return iterableToSet(cluster.state().getMembers()).stream()
- .filter(it -> it.address().equals(cluster.selfAddress()))
- .anyMatch(it -> it.status().equals(MemberStatus.up()));
- }
- public void forceMemberDown(Set<String> addresses, Set<String> statuses) {
- forceMemberDown(toPredicate(addresses, statuses));
- }
- private Predicate<Member> toPredicate(Set<String> addresses, Set<String> statuses) {
- return member ->
- (statuses.isEmpty() || statuses.contains(member.status().toString()))
- && (addresses.isEmpty() || addresses.contains(member.address().toString()));
- }
- private void forceMemberDown(Predicate<Member> memberPredicate) {
- if (isRunning()) {
- iterableToSet(cluster.state().getMembers()).stream()
- .filter(memberPredicate)
- .forEach(member -> {
- cluster.leave(member.address());
- cluster.down(member.address());
- });
- }
- }
- private static <T> Set<T> iterableToSet(Iterable<T> iterable) {
- return stream(ofNullable(iterable).orElseGet(Collections::emptyList).spliterator(), false)
- .collect(toSet());
- }
- @EventListener(ContextClosedEvent.class)
- public void onApplicationEvent(ContextClosedEvent event) {
- stop();
- }
- }
Add Comment
Please, Sign In to add comment