Advertisement
Guest User

Untitled

a guest
Oct 17th, 2019
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.96 KB | None | 0 0
  1. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
  2. index 4de454278d..6d4ed32507 100644
  3. --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
  4. +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
  5. @@ -18,9 +18,18 @@
  6.  
  7. package org.apache.flink.runtime.dispatcher;
  8.  
  9. +import org.apache.flink.configuration.Configuration;
  10. +import org.apache.flink.runtime.blob.BlobServer;
  11. +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
  12. +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
  13. +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
  14. +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
  15. +import org.apache.flink.runtime.rpc.FatalErrorHandler;
  16. import org.apache.flink.runtime.rpc.RpcService;
  17. +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
  18.  
  19. import javax.annotation.Nonnull;
  20. +import javax.annotation.Nullable;
  21.  
  22. import java.util.UUID;
  23.  
  24. @@ -34,7 +43,16 @@ public interface DispatcherFactory<T extends Dispatcher> {
  25. */
  26. T createDispatcher(
  27. @Nonnull RpcService rpcService,
  28. - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception;
  29. + @Nonnull Configuration configuration,
  30. + @Nonnull HighAvailabilityServices highAvailabilityServices,
  31. + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
  32. + @Nonnull BlobServer blobServer,
  33. + @Nonnull HeartbeatServices heartbeatServices,
  34. + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
  35. + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
  36. + @Nonnull FatalErrorHandler fatalErrorHandler,
  37. + @Nonnull HistoryServerArchivist historyServerArchivist,
  38. + @Nullable String metricQueryServiceAddress) throws Exception;
  39.  
  40. default String generateEndpointIdWithUUID() {
  41. return getEndpointId() + UUID.randomUUID();
  42. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
  43. index 6751b1224d..817db330e5 100644
  44. --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
  45. +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
  46. @@ -33,7 +33,7 @@ import javax.annotation.Nullable;
  47. /**
  48. * {@link Dispatcher} services container.
  49. */
  50. -public class PartialDispatcherServices {
  51. +public class DispatcherServices {
  52.  
  53. @Nonnull
  54. private final Configuration configuration;
  55. @@ -68,7 +68,7 @@ public class PartialDispatcherServices {
  56. @Nonnull
  57. private final JobManagerRunnerFactory jobManagerRunnerFactory;
  58.  
  59. - public PartialDispatcherServices(
  60. + public DispatcherServices(
  61. @Nonnull Configuration configuration,
  62. @Nonnull HighAvailabilityServices highAvailabilityServices,
  63. @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
  64. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
  65. index a74610e047..7dd8ce90f6 100644
  66. --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
  67. +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
  68. @@ -19,12 +19,20 @@
  69. package org.apache.flink.runtime.dispatcher;
  70.  
  71. import org.apache.flink.configuration.Configuration;
  72. +import org.apache.flink.runtime.blob.BlobServer;
  73. import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
  74. import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
  75. +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
  76. +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
  77. import org.apache.flink.runtime.jobgraph.JobGraph;
  78. +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
  79. +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
  80. +import org.apache.flink.runtime.rpc.FatalErrorHandler;
  81. import org.apache.flink.runtime.rpc.RpcService;
  82. +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
  83.  
  84. import javax.annotation.Nonnull;
  85. +import javax.annotation.Nullable;
  86.  
  87. import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
  88.  
  89. @@ -41,9 +49,17 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
  90.  
  91. @Override
  92. public MiniDispatcher createDispatcher(
  93. - @Nonnull RpcService rpcService,
  94. - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
  95. - final Configuration configuration = partialDispatcherServices.getConfiguration();
  96. + @Nonnull RpcService rpcService,
  97. + @Nonnull Configuration configuration,
  98. + @Nonnull HighAvailabilityServices highAvailabilityServices,
  99. + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
  100. + @Nonnull BlobServer blobServer,
  101. + @Nonnull HeartbeatServices heartbeatServices,
  102. + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
  103. + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
  104. + @Nonnull FatalErrorHandler fatalErrorHandler,
  105. + @Nonnull HistoryServerArchivist historyServerArchivist,
  106. + @Nullable String metricQueryServiceAddress) throws Exception {
  107. final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
  108.  
  109. final String executionModeValue = configuration.getString(EXECUTION_MODE);
  110. @@ -53,7 +69,18 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
  111. return new MiniDispatcher(
  112. rpcService,
  113. getEndpointId(),
  114. - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE),
  115. + new DispatcherServices(
  116. + configuration,
  117. + highAvailabilityServices,
  118. + resourceManagerGatewayRetriever,
  119. + blobServer,
  120. + heartbeatServices,
  121. + jobManagerMetricGroup,
  122. + archivedExecutionGraphStore,
  123. + fatalErrorHandler,
  124. + historyServerArchivist,
  125. + metricQueryServiceAddress,
  126. + DefaultJobManagerRunnerFactory.INSTANCE),
  127. jobGraph,
  128. executionMode);
  129. }
  130. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
  131. index f9b373c029..75306539ce 100644
  132. --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
  133. +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
  134. @@ -18,9 +18,18 @@
  135.  
  136. package org.apache.flink.runtime.dispatcher;
  137.  
  138. +import org.apache.flink.configuration.Configuration;
  139. +import org.apache.flink.runtime.blob.BlobServer;
  140. +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
  141. +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
  142. +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
  143. +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
  144. +import org.apache.flink.runtime.rpc.FatalErrorHandler;
  145. import org.apache.flink.runtime.rpc.RpcService;
  146. +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
  147.  
  148. import javax.annotation.Nonnull;
  149. +import javax.annotation.Nullable;
  150.  
  151. /**
  152. * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
  153. @@ -30,12 +39,32 @@ public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
  154.  
  155. @Override
  156. public Dispatcher createDispatcher(
  157. - @Nonnull RpcService rpcService,
  158. - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
  159. + @Nonnull RpcService rpcService,
  160. + @Nonnull Configuration configuration,
  161. + @Nonnull HighAvailabilityServices highAvailabilityServices,
  162. + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
  163. + @Nonnull BlobServer blobServer,
  164. + @Nonnull HeartbeatServices heartbeatServices,
  165. + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
  166. + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
  167. + @Nonnull FatalErrorHandler fatalErrorHandler,
  168. + @Nonnull HistoryServerArchivist historyServerArchivist,
  169. + @Nullable String metricQueryServiceAddress) throws Exception {
  170. // create the default dispatcher
  171. return new StandaloneDispatcher(
  172. rpcService,
  173. getEndpointId(),
  174. - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE));
  175. + new DispatcherServices(
  176. + configuration,
  177. + highAvailabilityServices,
  178. + resourceManagerGatewayRetriever,
  179. + blobServer,
  180. + heartbeatServices,
  181. + jobManagerMetricGroup,
  182. + archivedExecutionGraphStore,
  183. + fatalErrorHandler,
  184. + historyServerArchivist,
  185. + metricQueryServiceAddress,
  186. + DefaultJobManagerRunnerFactory.INSTANCE));
  187. }
  188. }
  189. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
  190. index 6d50ed0490..21dc57d5ea 100644
  191. --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
  192. +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
  193. @@ -32,7 +32,6 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory;
  194. import org.apache.flink.runtime.dispatcher.DispatcherGateway;
  195. import org.apache.flink.runtime.dispatcher.DispatcherId;
  196. import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
  197. -import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
  198. import org.apache.flink.runtime.entrypoint.ClusterInformation;
  199. import org.apache.flink.runtime.heartbeat.HeartbeatServices;
  200. import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
  201. @@ -182,7 +181,8 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
  202.  
  203. final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
  204.  
  205. - final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
  206. + dispatcher = dispatcherFactory.createDispatcher(
  207. + rpcService,
  208. configuration,
  209. highAvailabilityServices,
  210. resourceManagerGatewayRetriever,
  211. @@ -194,10 +194,6 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
  212. historyServerArchivist,
  213. metricRegistry.getMetricQueryServiceGatewayRpcAddress());
  214.  
  215. - dispatcher = dispatcherFactory.createDispatcher(
  216. - rpcService,
  217. - partialDispatcherServices);
  218. -
  219. log.debug("Starting ResourceManager.");
  220. resourceManager.start();
  221. resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
  222. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
  223. index a85ec92790..aa5e372d60 100644
  224. --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
  225. +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SessionDispatcherWithUUIDFactory.java
  226. @@ -18,15 +18,25 @@
  227.  
  228. package org.apache.flink.runtime.minicluster;
  229.  
  230. +import org.apache.flink.configuration.Configuration;
  231. +import org.apache.flink.runtime.blob.BlobServer;
  232. +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
  233. import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
  234. import org.apache.flink.runtime.dispatcher.Dispatcher;
  235. import org.apache.flink.runtime.dispatcher.DispatcherFactory;
  236. import org.apache.flink.runtime.dispatcher.DispatcherServices;
  237. -import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
  238. +import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
  239. import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
  240. +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
  241. +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
  242. +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
  243. +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
  244. +import org.apache.flink.runtime.rpc.FatalErrorHandler;
  245. import org.apache.flink.runtime.rpc.RpcService;
  246. +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
  247.  
  248. import javax.annotation.Nonnull;
  249. +import javax.annotation.Nullable;
  250.  
  251. /**
  252. * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher} which has an
  253. @@ -37,12 +47,32 @@ public enum SessionDispatcherWithUUIDFactory implements DispatcherFactory<Dispat
  254.  
  255. @Override
  256. public Dispatcher createDispatcher(
  257. - @Nonnull RpcService rpcService,
  258. - @Nonnull PartialDispatcherServices partialDispatcherServices) throws Exception {
  259. + @Nonnull RpcService rpcService,
  260. + @Nonnull Configuration configuration,
  261. + @Nonnull HighAvailabilityServices highAvailabilityServices,
  262. + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
  263. + @Nonnull BlobServer blobServer,
  264. + @Nonnull HeartbeatServices heartbeatServices,
  265. + @Nonnull JobManagerMetricGroup jobManagerMetricGroup,
  266. + @Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
  267. + @Nonnull FatalErrorHandler fatalErrorHandler,
  268. + @Nonnull HistoryServerArchivist historyServerArchivist,
  269. + @Nullable String metricQueryServiceAddress) throws Exception {
  270. // create the default dispatcher
  271. return new StandaloneDispatcher(
  272. rpcService,
  273. generateEndpointIdWithUUID(),
  274. - DispatcherServices.from(partialDispatcherServices, DefaultJobManagerRunnerFactory.INSTANCE));
  275. + new DispatcherServices(
  276. + configuration,
  277. + highAvailabilityServices,
  278. + resourceManagerGatewayRetriever,
  279. + blobServer,
  280. + heartbeatServices,
  281. + jobManagerMetricGroup,
  282. + archivedExecutionGraphStore,
  283. + fatalErrorHandler,
  284. + historyServerArchivist,
  285. + metricQueryServiceAddress,
  286. + DefaultJobManagerRunnerFactory.INSTANCE));
  287. }
  288. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement