Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
- index 4de454278d..6d4ed32507 100644
- --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
- +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
- @@ -18,9 +18,18 @@
- package org.apache.flink.runtime.dispatcher;
- +import org.apache.flink.configuration.Configuration;
- +import org.apache.flink.runtime.blob.BlobServer;
- +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
- +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
- +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
- +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
- +import org.apache.flink.runtime.rpc.FatalErrorHandler;
- import org.apache.flink.runtime.rpc.RpcService;
- +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
- import javax.annotation.Nonnull;
- +import javax.annotation.Nullable;
- import java.util.UUID;
- @@ -34,7 +43,16 @@ public interface DispatcherFactory<T extends Dispatcher> {
- */
- T createDispatcher(
- @Nonnull RpcService rpcService,
- - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception;
- + @Nonnull Configuration configuration,
- + @Nonnull HighAvailabilityServices highAvailabilityServices,
- + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- + @Nonnull BlobServer blobServer,
- + @Nonnull HeartbeatServices heartbeatServices,
- + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
- + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
- + @Nonnull FatalErrorHandler fatalErrorHandler,
- + @Nonnull HistoryServerArchivist historyServerArchivist,
- + @Nullable String metricQueryServiceAddress) throws Exception;
- default String generateEndpointIdWithUUID() {
- return getEndpointId() + UUID.randomUUID();
- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
- index 6751b1224d..817db330e5 100644
- --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
- +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
- @@ -33,7 +33,7 @@ import javax.annotation.Nullable;
- /**
- * {@link Dispatcher} services container.
- */
- -public class PartialDispatcherServices {
- +public class DispatcherServices {
- @Nonnull
- private final Configuration configuration;
- @@ -68,7 +68,7 @@ public class PartialDispatcherServices {
- @Nonnull
- private final JobManagerRunnerFactory jobManagerRunnerFactory;
- - public PartialDispatcherServices(
- + public DispatcherServices(
- @Nonnull Configuration configuration,
- @Nonnull HighAvailabilityServices highAvailabilityServices,
- @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
- index a74610e047..7dd8ce90f6 100644
- --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
- +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
- @@ -19,12 +19,20 @@
- package org.apache.flink.runtime.dispatcher;
- import org.apache.flink.configuration.Configuration;
- +import org.apache.flink.runtime.blob.BlobServer;
- import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
- import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
- +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
- +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
- import org.apache.flink.runtime.jobgraph.JobGraph;
- +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
- +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
- +import org.apache.flink.runtime.rpc.FatalErrorHandler;
- import org.apache.flink.runtime.rpc.RpcService;
- +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
- import javax.annotation.Nonnull;
- +import javax.annotation.Nullable;
- import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
- @@ -41,9 +49,17 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
- @Override
- public MiniDispatcher createDispatcher(
- - @Nonnull RpcService rpcService,
- - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
- - final Configuration configuration = partialDispatcherServices.getConfiguration();
- + @Nonnull RpcService rpcService,
- + @Nonnull Configuration configuration,
- + @Nonnull HighAvailabilityServices highAvailabilityServices,
- + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- + @Nonnull BlobServer blobServer,
- + @Nonnull HeartbeatServices heartbeatServices,
- + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
- + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
- + @Nonnull FatalErrorHandler fatalErrorHandler,
- + @Nonnull HistoryServerArchivist historyServerArchivist,
- + @Nullable String metricQueryServiceAddress) throws Exception {
- final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
- final String executionModeValue = configuration.getString(EXECUTION_MODE);
- @@ -53,7 +69,18 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
- return new MiniDispatcher(
- rpcService,
- getEndpointId(),
- - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE),
- + new DispatcherServices(
- + configuration,
- + highAvailabilityServices,
- + resourceManagerGatewayRetriever,
- + blobServer,
- + heartbeatServices,
- + jobManagerMetricGroup,
- + archivedExecutionGraphStore,
- + fatalErrorHandler,
- + historyServerArchivist,
- + metricQueryServiceAddress,
- + DefaultJobManagerRunnerFactory.INSTANCE),
- jobGraph,
- executionMode);
- }
- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
- index f9b373c029..75306539ce 100644
- --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
- +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
- @@ -18,9 +18,18 @@
- package org.apache.flink.runtime.dispatcher;
- +import org.apache.flink.configuration.Configuration;
- +import org.apache.flink.runtime.blob.BlobServer;
- +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
- +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
- +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
- +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
- +import org.apache.flink.runtime.rpc.FatalErrorHandler;
- import org.apache.flink.runtime.rpc.RpcService;
- +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
- import javax.annotation.Nonnull;
- +import javax.annotation.Nullable;
- /**
- * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
- @@ -30,12 +39,32 @@ public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
- @Override
- public Dispatcher createDispatcher(
- - @Nonnull RpcService rpcService,
- - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
- + @Nonnull RpcService rpcService,
- + @Nonnull Configuration configuration,
- + @Nonnull HighAvailabilityServices highAvailabilityServices,
- + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- + @Nonnull BlobServer blobServer,
- + @Nonnull HeartbeatServices heartbeatServices,
- + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
- + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
- + @Nonnull FatalErrorHandler fatalErrorHandler,
- + @Nonnull HistoryServerArchivist historyServerArchivist,
- + @Nullable String metricQueryServiceAddress) throws Exception {
- // create the default dispatcher
- return new StandaloneDispatcher(
- rpcService,
- getEndpointId(),
- - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE));
- + new DispatcherServices(
- + configuration,
- + highAvailabilityServices,
- + resourceManagerGatewayRetriever,
- + blobServer,
- + heartbeatServices,
- + jobManagerMetricGroup,
- + archivedExecutionGraphStore,
- + fatalErrorHandler,
- + historyServerArchivist,
- + metricQueryServiceAddress,
- + DefaultJobManagerRunnerFactory.INSTANCE));
- }
- }
- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
- index 6d50ed0490..21dc57d5ea 100644
- --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
- +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
- @@ -32,7 +32,6 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory;
- import org.apache.flink.runtime.dispatcher.DispatcherGateway;
- import org.apache.flink.runtime.dispatcher.DispatcherId;
- import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
- -import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
- import org.apache.flink.runtime.entrypoint.ClusterInformation;
- import org.apache.flink.runtime.heartbeat.HeartbeatServices;
- import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
- @@ -182,7 +181,8 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
- final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
- - final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
- + dispatcher = dispatcherFactory.createDispatcher(
- + rpcService,
- configuration,
- highAvailabilityServices,
- resourceManagerGatewayRetriever,
- @@ -194,10 +194,6 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
- historyServerArchivist,
- metricRegistry.getMetricQueryServiceGatewayRpcAddress());
- - dispatcher = dispatcherFactory.createDispatcher(
- - rpcService,
- - partialDispatcherServices);
- -
- log.debug("Starting ResourceManager.");
- resourceManager.start();
- resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
- index a85ec92790..aa5e372d60 100644
- --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
- +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
- @@ -18,15 +18,25 @@
- package org.apache.flink.runtime.minicluster;
- +import org.apache.flink.configuration.Configuration;
- +import org.apache.flink.runtime.blob.BlobServer;
- +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
- import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
- import org.apache.flink.runtime.dispatcher.Dispatcher;
- import org.apache.flink.runtime.dispatcher.DispatcherFactory;
- import org.apache.flink.runtime.dispatcher.DispatcherServices;
- -import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
- +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
- import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
- +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
- +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
- +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
- +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
- +import org.apache.flink.runtime.rpc.FatalErrorHandler;
- import org.apache.flink.runtime.rpc.RpcService;
- +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
- import javax.annotation.Nonnull;
- +import javax.annotation.Nullable;
- /**
- * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher} which has an
- @@ -37,12 +47,32 @@ public enum SessionDispatcherWithUUIDFactory implements DispatcherFactory<Dispat
- @Override
- public Dispatcher createDispatcher(
- - @Nonnull RpcService rpcService,
- - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
- + @Nonnull RpcService rpcService,
- + @Nonnull Configuration configuration,
- + @Nonnull HighAvailabilityServices highAvailabilityServices,
- + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
- + @Nonnull BlobServer blobServer,
- + @Nonnull HeartbeatServices heartbeatServices,
- + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
- + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
- + @Nonnull FatalErrorHandler fatalErrorHandler,
- + @Nonnull HistoryServerArchivist historyServerArchivist,
- + @Nullable String metricQueryServiceAddress) throws Exception {
- // create the default dispatcher
- return new StandaloneDispatcher(
- rpcService,
- generateEndpointIdWithUUID(),
- - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE));
- + new DispatcherServices(
- + configuration,
- + highAvailabilityServices,
- + resourceManagerGatewayRetriever,
- + blobServer,
- + heartbeatServices,
- + jobManagerMetricGroup,
- + archivedExecutionGraphStore,
- + fatalErrorHandler,
- + historyServerArchivist,
- + metricQueryServiceAddress,
- + DefaultJobManagerRunnerFactory.INSTANCE));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement