Advertisement
Guest User

Untitled

a guest
Mar 22nd, 2017
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.58 KB | None | 0 0
  1. diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
  2. index c3cbdbc..1b1a099 100644
  3. --- a/src/slave/slave.cpp
  4. +++ b/src/slave/slave.cpp
  5. @@ -2186,7 +2186,22 @@ void Slave::__run(
  6. Executor* executor = framework->getExecutor(executorId);
  7.  
  8. if (executor == nullptr) {
  9. - executor = launchExecutor(frameworkId, executorInfo, task, taskGroup);
  10. + executor = addExecutor(framework, executorInfo, task, taskGroup);
  11. +
  12. + if (secretGenerator.get()) {
  13. + generateSecret(framework->id(), executor->id, executor->containerId)
  14. + .onAny(defer(
  15. + self(),
  16. + Self::launchExecutor,
  17. + lambda::_1,
  18. + executorInfo,
  19. + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
  20. + } else {
  21. + launchExecutor(
  22. + None(),
  23. + executorInfo,
  24. + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
  25. + }
  26. }
  27.  
  28. CHECK_NOTNULL(executor);
  29. @@ -2516,15 +2531,13 @@ void Slave::___run(
  30.  
  31. // Create and launch an executor.
  32. Executor* Slave::launchExecutor(
  33. - const FrameworkID& frameworkId,
  34. + Framework* framework,
  35. const ExecutorInfo& executorInfo,
  36. const Option<TaskInfo>& taskInfo,
  37. const Option<TaskGroupInfo>& taskGroup)
  38. {
  39. - // This function is executed synchronously from `__run()` after the framework
  40. - // has been validated, so we assume the framework to be valid here.
  41. - Framework* framework = getFramework(frameworkId);
  42. CHECK_NOTNULL(framework);
  43. + const FrameworkID& frameworkId = framework->info.id();
  44.  
  45. // Verify that Resource.AllocationInfo is set, if coming
  46. // from a MULTI_ROLE master this will be set, otherwise
  47. @@ -2646,79 +2659,37 @@ Executor* Slave::launchExecutor(
  48. secretGenerator->generate(principal)
  49. .onAny(defer(
  50. self(),
  51. - [this,
  52. - frameworkId,
  53. - executorInfo_,
  54. - taskInfo,
  55. - taskGroup,
  56. - containerId](const Future<Secret>& future) {
  57. - // Helper to fail tasks if secret generation was unsuccessful.
  58. - auto failTasks =
  59. - [this, &frameworkId, &executorInfo_, &taskInfo, &taskGroup]
  60. - (const string& failedMessage) {
  61. - LOG(ERROR) << "Ignoring running "
  62. - << taskOrTaskGroup(taskInfo, taskGroup)
  63. - << " of framework " << frameworkId << ": "
  64. - << failedMessage;
  65. -
  66. - Framework* framework = getFramework(frameworkId);
  67. - if (framework == nullptr) {
  68. - LOG(WARNING) << "Ignoring sending status update for "
  69. - << taskOrTaskGroup(taskInfo, taskGroup)
  70. - << " because the framework " << frameworkId
  71. - << " does not exist";
  72. - return;
  73. - }
  74. -
  75. - vector<TaskInfo> tasks;
  76. - if (taskInfo.isSome()) {
  77. - tasks.push_back(taskInfo.get());
  78. - } else {
  79. - foreach (const TaskInfo& _task, taskGroup->tasks()) {
  80. - tasks.push_back(_task);
  81. - }
  82. - }
  83. -
  84. - foreach (const TaskInfo& _task, tasks) {
  85. - const StatusUpdate update = protobuf::createStatusUpdate(
  86. - frameworkId,
  87. - info.id(),
  88. - _task.task_id(),
  89. - TASK_FAILED,
  90. - TaskStatus::SOURCE_SLAVE,
  91. - UUID::random(),
  92. - failedMessage,
  93. - TaskStatus::REASON_EXECUTOR_TERMINATED);
  94. -
  95. - statusUpdate(update, UPID());
  96. - }
  97. -
  98. - const ExecutorID& executorId = executorInfo_.executor_id();
  99. - Executor* executor = framework->getExecutor(executorId);
  100. - if (executor == nullptr) {
  101. - LOG(WARNING) << "Ignoring removing executor " << executorId
  102. - << " because the executor does not exist";
  103. - return;
  104. - }
  105. -
  106. - removeExecutor(framework, executor);
  107. + [this, frameworkId, executorInfo_, taskInfo, taskGroup, containerId](
  108. + const Future<Secret>& future) {
  109. + // Helper for executor termination if secret generation
  110. + // was unsuccessful.
  111. + auto terminateExecutor =
  112. + [this, frameworkId, executorInfo_](const string& message) {
  113. + ContainerTermination termination;
  114. + termination.set_state(TASK_FAILED);
  115. + termination.add_reasons(
  116. + TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
  117. +
  118. + termination.set_message(message);
  119. +
  120. + executorTerminated(frameworkId, executorInfo_.executor_id(), termination);
  121. };
  122.  
  123. Option<Secret> authenticationToken;
  124.  
  125. if (!future.isReady()) {
  126. - failTasks(
  127. + terminateExecutor(
  128. "Secret generation failed: " +
  129. (future.isFailed() ? future.failure() : "discarded"));
  130. } else {
  131. Option<Error> error = validateSecret(future.get());
  132.  
  133. if (error.isSome()) {
  134. - failTasks(
  135. + terminateExecutor(
  136. "Secret generator produced an invalid secret: " +
  137. error.get().message);
  138. } else if (!(future.get().type() == Secret::VALUE)) {
  139. - failTasks(
  140. + terminateExecutor(
  141. "Secret generator produced a secret that is not VALUE "
  142. "type; only VALUE-type secrets are supported at this time");
  143. } else {
  144. @@ -2758,6 +2729,120 @@ Executor* Slave::launchExecutor(
  145. }
  146.  
  147.  
  148. +Executor* Framework::addExecutor(
  149. + const ExecutorInfo& executorInfo)
  150. +{
  151. + // Verify that Resource.AllocationInfo is set, if coming
  152. + // from a MULTI_ROLE master this will be set, otherwise
  153. + // the agent will inject it when receiving the executor.
  154. + foreach (const Resource& resource, executorInfo.resources()) {
  155. + CHECK(resource.has_allocation_info());
  156. + }
  157. +
  158. + // Generate an ID for the executor's container.
  159. + // TODO(idownes) This should be done by the containerizer but we
  160. + // need the ContainerID to create the executor's directory. Fix
  161. + // this when 'launchExecutor()' is handled asynchronously.
  162. + ContainerID containerId;
  163. + containerId.set_value(UUID::random().toString());
  164. +
  165. + Option<string> user = None();
  166. +#ifndef __WINDOWS__
  167. + if (slave->flags.switch_user) {
  168. + // The command (either in form of task or executor command) can
  169. + // define a specific user to run as. If present, this precedes the
  170. + // framework user value. The selected user will have been verified by
  171. + // the master at this point through the active ACLs.
  172. + // NOTE: The global invariant is that the executor info at this
  173. + // point is (1) the user provided task.executor() or (2) a command
  174. + // executor constructed by the slave from the task.command().
  175. + // If this changes, we need to check the user in both
  176. + // task.command() and task.executor().command() below.
  177. + user = info.user();
  178. + if (executorInfo.command().has_user()) {
  179. + user = executorInfo.command().user();
  180. + }
  181. + }
  182. +#endif // __WINDOWS__
  183. +
  184. + // Create a directory for the executor.
  185. + const string directory = paths::createExecutorDirectory(
  186. + slave->flags.work_dir,
  187. + slave->info.id(),
  188. + id(),
  189. + executorInfo.executor_id(),
  190. + containerId,
  191. + user);
  192. +
  193. + Executor* executor = new Executor(
  194. + slave,
  195. + id(),
  196. + executorInfo,
  197. + containerId,
  198. + directory,
  199. + user,
  200. + info.checkpoint());
  201. +
  202. + if (executor->checkpoint) {
  203. + executor->checkpointExecutor();
  204. + }
  205. +
  206. + CHECK(!executors.contains(executorInfo.executor_id()))
  207. + << "Unknown executor '" << executorInfo.executor_id() << "'";
  208. +
  209. + executors[executorInfo.executor_id()] = executor;
  210. +
  211. + const ExecutorID& executorId = executorInfo.executor_id();
  212. + const PID<Slave> slavePid = self();
  213. +
  214. + auto authorize =
  215. + [slavePid, executorId, frameworkId](const Option<Principal>& principal) {
  216. + return dispatch(
  217. + slavePid,
  218. + &Slave::authorizeSandboxAccess,
  219. + principal,
  220. + frameworkId,
  221. + executorId);
  222. + };
  223. +
  224. + files->attach(executor->directory, executor->directory, authorize)
  225. + .onAny(defer(self(), &Self::fileAttached, lambda::_1, executor->directory));
  226. +
  227. + return executor;
  228. +}
  229. +
  230. +
  231. +Future<Secret> Slave::generateSecret(
  232. + const FrameworkID& frameworkId,
  233. + const ExecutorID& executorId,
  234. + const ContainerID& containerId)
  235. +{
  236. + Principal principal(
  237. + Option<string>::none(),
  238. + {
  239. + {"cid", containerId.value()},
  240. + {"eid", executorId.value()},
  241. + {"fid", frameworkId.value()}
  242. + });
  243. +
  244. + return secretGenerator->generate(principal)
  245. + .then([](const Secret& secret) {
  246. + Option<Error> error = validateSecret(secret);
  247. + if (error.isSome()) {
  248. + return Future<Secret>::failure(
  249. + "Secret generator produced an invalid secret: " +
  250. + error.get().message);
  251. + } else if (secret.type() != Secret::VALUE) {
  252. + return Future<Secret>::failure(
  253. + "Secret generator produced a secret that is not VALUE "
  254. + "type; only VALUE-type secrets are supported at this time");
  255. +
  256. + }
  257. + return secret;
  258. + });
  259. +}
  260. +
  261. +
  262. void Slave::_launchExecutor(
  263. const FrameworkID& frameworkId,
  264. const ExecutorInfo& executorInfo,
  265. @@ -2780,11 +2865,6 @@ void Slave::_launchExecutor(
  266. LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(taskInfo, taskGroup)
  267. << " of framework " << frameworkId
  268. << " because the framework is terminating";
  269. -
  270. - if (framework->executors.empty() && framework->pending.empty()) {
  271. - removeFramework(framework);
  272. - }
  273. -
  274. return;
  275. }
  276.  
  277. diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
  278. index 6a06093..cfe81ea 100644
  279. --- a/src/slave/slave.hpp
  280. +++ b/src/slave/slave.hpp
  281. @@ -362,7 +362,7 @@ public:
  282. // If an executor is launched for a task group, `taskInfo` would
  283. // not be set.
  284. Executor* launchExecutor(
  285. - const FrameworkID& frameworkId,
  286. + Framework* framework,
  287. const ExecutorInfo& executorInfo,
  288. const Option<TaskInfo>& taskInfo,
  289. const Option<TaskGroupInfo>& taskGroup);
  290. @@ -1120,6 +1120,7 @@ struct Framework
  291. ~Framework();
  292.  
  293. void destroyExecutor(const ExecutorID& executorId);
  294. + Executor* addExecutor(const ExecutorInfo& executorInfo);
  295. Executor* getExecutor(const ExecutorID& executorId) const;
  296. Executor* getExecutor(const TaskID& taskId) const;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement