Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
- index c3cbdbc..1b1a099 100644
- --- a/src/slave/slave.cpp
- +++ b/src/slave/slave.cpp
- @@ -2186,7 +2186,22 @@ void Slave::__run(
- Executor* executor = framework->getExecutor(executorId);
- if (executor == nullptr) {
- - executor = launchExecutor(frameworkId, executorInfo, task, taskGroup);
- + executor = addExecutor(framework, executorInfo, task, taskGroup);
- +
- + if (secretGenerator.get()) {
- + generateSecret(framework->id(), executor->id, executor->containerId)
- + .onAny(defer(
- + self(),
- + Self::launchExecutor,
- + lambda::_1,
- + executorInfo,
- + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
- + } else {
- + launchExecutor(
- + None(),
- + executorInfo,
- + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
- + }
- }
- CHECK_NOTNULL(executor);
- @@ -2516,15 +2531,13 @@ void Slave::___run(
- // Create and launch an executor.
- Executor* Slave::launchExecutor(
- - const FrameworkID& frameworkId,
- + Framework* framework,
- const ExecutorInfo& executorInfo,
- const Option<TaskInfo>& taskInfo,
- const Option<TaskGroupInfo>& taskGroup)
- {
- - // This function is executed synchronously from `__run()` after the framework
- - // has been validated, so we assume the framework to be valid here.
- - Framework* framework = getFramework(frameworkId);
- CHECK_NOTNULL(framework);
- + const FrameworkID& frameworkId = framework->info.id();
- // Verify that Resource.AllocationInfo is set, if coming
- // from a MULTI_ROLE master this will be set, otherwise
- @@ -2646,79 +2659,37 @@ Executor* Slave::launchExecutor(
- secretGenerator->generate(principal)
- .onAny(defer(
- self(),
- - [this,
- - frameworkId,
- - executorInfo_,
- - taskInfo,
- - taskGroup,
- - containerId](const Future<Secret>& future) {
- - // Helper to fail tasks if secret generation was unsuccessful.
- - auto failTasks =
- - [this, &frameworkId, &executorInfo_, &taskInfo, &taskGroup]
- - (const string& failedMessage) {
- - LOG(ERROR) << "Ignoring running "
- - << taskOrTaskGroup(taskInfo, taskGroup)
- - << " of framework " << frameworkId << ": "
- - << failedMessage;
- -
- - Framework* framework = getFramework(frameworkId);
- - if (framework == nullptr) {
- - LOG(WARNING) << "Ignoring sending status update for "
- - << taskOrTaskGroup(taskInfo, taskGroup)
- - << " because the framework " << frameworkId
- - << " does not exist";
- - return;
- - }
- -
- - vector<TaskInfo> tasks;
- - if (taskInfo.isSome()) {
- - tasks.push_back(taskInfo.get());
- - } else {
- - foreach (const TaskInfo& _task, taskGroup->tasks()) {
- - tasks.push_back(_task);
- - }
- - }
- -
- - foreach (const TaskInfo& _task, tasks) {
- - const StatusUpdate update = protobuf::createStatusUpdate(
- - frameworkId,
- - info.id(),
- - _task.task_id(),
- - TASK_FAILED,
- - TaskStatus::SOURCE_SLAVE,
- - UUID::random(),
- - failedMessage,
- - TaskStatus::REASON_EXECUTOR_TERMINATED);
- -
- - statusUpdate(update, UPID());
- - }
- -
- - const ExecutorID& executorId = executorInfo_.executor_id();
- - Executor* executor = framework->getExecutor(executorId);
- - if (executor == nullptr) {
- - LOG(WARNING) << "Ignoring removing executor " << executorId
- - << " because the executor does not exist";
- - return;
- - }
- -
- - removeExecutor(framework, executor);
- + [this, frameworkId, executorInfo_, taskInfo, taskGroup, containerId](
- + const Future<Secret>& future) {
- + // Helper for executor termination if secret generation
- + // was unsuccessful.
- + auto terminateExecutor =
- + [this, frameworkId, executorInfo_](const string& message) {
- + ContainerTermination termination;
- + termination.set_state(TASK_FAILED);
- + termination.add_reasons(
- + TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
- +
- + termination.set_message(message);
- +
- + executorTerminated(frameworkId, executorInfo_.executor_id(), termination);
- };
- Option<Secret> authenticationToken;
- if (!future.isReady()) {
- - failTasks(
- + terminateExecutor(
- "Secret generation failed: " +
- (future.isFailed() ? future.failure() : "discarded"));
- } else {
- Option<Error> error = validateSecret(future.get());
- if (error.isSome()) {
- - failTasks(
- + terminateExecutor(
- "Secret generator produced an invalid secret: " +
- error.get().message);
- } else if (!(future.get().type() == Secret::VALUE)) {
- - failTasks(
- + terminateExecutor(
- "Secret generator produced a secret that is not VALUE "
- "type; only VALUE-type secrets are supported at this time");
- } else {
- @@ -2758,6 +2729,120 @@ Executor* Slave::launchExecutor(
- }
- +Executor* Framework::addExecutor(
- + const ExecutorInfo& executorInfo)
- +{
- + // Verify that Resource.AllocationInfo is set, if coming
- + // from a MULTI_ROLE master this will be set, otherwise
- + // the agent will inject it when receiving the executor.
- + foreach (const Resource& resource, executorInfo.resources()) {
- + CHECK(resource.has_allocation_info());
- + }
- +
- + // Generate an ID for the executor's container.
- + // TODO(idownes) This should be done by the containerizer but we
- + // need the ContainerID to create the executor's directory. Fix
- + // this when 'launchExecutor()' is handled asynchronously.
- + ContainerID containerId;
- + containerId.set_value(UUID::random().toString());
- +
- + Option<string> user = None();
- +#ifndef __WINDOWS__
- + if (slave->flags.switch_user) {
- + // The command (either in form of task or executor command) can
- + // define a specific user to run as. If present, this precedes the
- + // framework user value. The selected user will have been verified by
- + // the master at this point through the active ACLs.
- + // NOTE: The global invariant is that the executor info at this
- + // point is (1) the user provided task.executor() or (2) a command
- + // executor constructed by the slave from the task.command().
- + // If this changes, we need to check the user in both
- + // task.command() and task.executor().command() below.
- + user = info.user();
- + if (executorInfo.command().has_user()) {
- + user = executorInfo.command().user();
- + }
- + }
- +#endif // __WINDOWS__
- +
- + // Create a directory for the executor.
- + const string directory = paths::createExecutorDirectory(
- + slave->flags.work_dir,
- + slave->info.id(),
- + id(),
- + executorInfo.executor_id(),
- + containerId,
- + user);
- +
- + Executor* executor = new Executor(
- + slave,
- + id(),
- + executorInfo,
- + containerId,
- + directory,
- + user,
- + info.checkpoint());
- +
- + if (executor->checkpoint) {
- + executor->checkpointExecutor();
- + }
- +
- + CHECK(!executors.contains(executorInfo.executor_id()))
- + << "Unknown executor '" << executorInfo.executor_id() << "'";
- +
- + executors[executorInfo.executor_id()] = executor;
- +
- + const ExecutorID& executorId = executorInfo.executor_id();
- + const PID<Slave> slavePid = self();
- +
- + auto authorize =
- + [slavePid, executorId, frameworkId](const Option<Principal>& principal) {
- + return dispatch(
- + slavePid,
- + &Slave::authorizeSandboxAccess,
- + principal,
- + frameworkId,
- + executorId);
- + };
- +
- + files->attach(executor->directory, executor->directory, authorize)
- + .onAny(defer(self(), &Self::fileAttached, lambda::_1, executor->directory));
- +
- + return executor;
- +}
- +
- +
- +Future<Secret> Slave::generateSecret(
- + const FrameworkID& frameworkId,
- + const ExecutorID& executorId,
- + const ContainerID& containerId)
- +{
- + Principal principal(
- + Option<string>::none(),
- + {
- + {"cid", containerId.value()},
- + {"eid", executorId.value()},
- + {"fid", frameworkId.value()}
- + });
- +
- + return secretGenerator->generate(principal)
- + .then([](const Secret& secret) {
- + Option<Error> error = validateSecret(secret);
- + if (error.isSome()) {
- + return Future<Secret>::failure(
- + "Secret generator produced an invalid secret: " +
- + error.get().message);
- + } else if (secret.type() != Secret::VALUE) {
- + return Future<Secret>::failure(
- + "Secret generator produced a secret that is not VALUE "
- + "type; only VALUE-type secrets are supported at this time");
- +
- + }
- + return secret;
- + });
- +}
- +
- +
- void Slave::_launchExecutor(
- const FrameworkID& frameworkId,
- const ExecutorInfo& executorInfo,
- @@ -2780,11 +2865,6 @@ void Slave::_launchExecutor(
- LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(taskInfo, taskGroup)
- << " of framework " << frameworkId
- << " because the framework is terminating";
- -
- - if (framework->executors.empty() && framework->pending.empty()) {
- - removeFramework(framework);
- - }
- -
- return;
- }
- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
- index 6a06093..cfe81ea 100644
- --- a/src/slave/slave.hpp
- +++ b/src/slave/slave.hpp
- @@ -362,7 +362,7 @@ public:
- // If an executor is launched for a task group, `taskInfo` would
- // not be set.
- Executor* launchExecutor(
- - const FrameworkID& frameworkId,
- + Framework* framework,
- const ExecutorInfo& executorInfo,
- const Option<TaskInfo>& taskInfo,
- const Option<TaskGroupInfo>& taskGroup);
- @@ -1120,6 +1120,7 @@ struct Framework
- ~Framework();
- void destroyExecutor(const ExecutorID& executorId);
- + Executor* addExecutor(const ExecutorInfo& executorInfo);
- Executor* getExecutor(const ExecutorID& executorId) const;
- Executor* getExecutor(const TaskID& taskId) const;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement