Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
- index f5c4c31d0..7af8116da 100644
- --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
- +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
- @@ -34,6 +34,8 @@ import java.nio.file.NoSuchFileException;
- import java.nio.file.Path;
- import java.nio.file.StandardOpenOption;
- import java.util.HashMap;
- +import java.util.HashSet;
- +import java.util.Set;
- import java.util.regex.Pattern;
- /**
- @@ -267,6 +269,11 @@ public class StateDirectory {
- }
- }
- + private Set<TaskId> committedTasks = new HashSet<>();
- + void markTaskCommitted(TaskId taskId) {
- + committedTasks.add(taskId);
- + }
- +
- private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
- final boolean manualUserCall) throws Exception {
- final File[] taskDirs = listTaskDirectories();
- @@ -282,7 +289,8 @@ public class StateDirectory {
- if (lock(id)) {
- final long now = time.milliseconds();
- final long lastModifiedMs = taskDir.lastModified();
- - if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
- + final boolean canClean = committedTasks.contains(id) && now > lastModifiedMs + cleanupDelayMs;
- + if (canClean || manualUserCall) {
- if (!manualUserCall) {
- log.info(
- "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
- index 3d974a572..a5df6a7db 100644
- --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
- +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
- @@ -499,7 +499,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
- } catch (final CommitFailedException | ProducerFencedException error) {
- throw new TaskMigratedException(this, error);
- }
- -
- + stateDirectory.markTaskCommitted(id);
- commitNeeded = false;
- commitRequested = false;
- taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement