Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.apache.kafka.common.security.kerberos;
- import javax.security.auth.Subject;
- import javax.security.auth.login.LoginException;
- import java.io.IOException;
- import java.util.Map;
- import org.apache.kafka.common.security.JaasUtils;
- import org.apache.kafka.common.KafkaException;
- import org.apache.kafka.common.Configurable;
- import org.apache.zookeeper.Login;
- public class LoginManager implements Configurable {
- public enum Mode { CLIENT, SERVER };
- private Login login;
- private final String serviceName;
- private final String loginContext;
- private final Mode mode;
- volatile private static LoginManager INSTANCE;
- private LoginManager(Mode mode) throws IOException, LoginException {
- this.mode = mode;
- if (mode == Mode.SERVER)
- this.loginContext = JaasUtils.LOGIN_CONTEXT_SERVER;
- else
- this.loginContext = JaasUtils.LOGIN_CONTEXT_CLIENT;
- this.serviceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME);
- }
- public static final LoginManager getLoginManager(Mode mode) throws IOException, LoginException {
- if(INSTANCE != null && INSTANCE.mode.equals(mode)) {
- return INSTANCE;
- } else if (INSTANCE != null && !INSTANCE.mode.equals(mode)) {
- throw new IllegalArgumentException("cant change mode once initialized");
- } else synchronized (LoginManager.class) {
- if(INSTANCE == null) {
- INSTANCE = new LoginManager(mode);
- }
- }
- return INSTANCE;
- }
- @Override
- public void configure(Map<String, ?> configs) throws KafkaException {
- try {
- login = new Login(loginContext);
- login.startThreadIfNeeded();
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
- public Subject subject() {
- return login.subject();
- }
- public String serviceName() {
- return serviceName;
- }
- public void close() {
- login.shutdown();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement