Advertisement
Guest User

Untitled

a guest
Apr 26th, 2019
142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.65 KB | None | 0 0
  1. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  2. index f5c4c31d0..7af8116da 100644
  3. --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  4. +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  5. @@ -34,6 +34,8 @@ import java.nio.file.NoSuchFileException;
  6. import java.nio.file.Path;
  7. import java.nio.file.StandardOpenOption;
  8. import java.util.HashMap;
  9. +import java.util.HashSet;
  10. +import java.util.Set;
  11. import java.util.regex.Pattern;
  12.  
  13. /**
  14. @@ -267,6 +269,11 @@ public class StateDirectory {
  15. }
  16. }
  17.  
  18. + private Set<TaskId> committedTasks = new HashSet<>();
  19. + void markTaskCommitted(TaskId taskId) {
  20. + committedTasks.add(taskId);
  21. + }
  22. +
  23. private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
  24. final boolean manualUserCall) throws Exception {
  25. final File[] taskDirs = listTaskDirectories();
  26. @@ -282,7 +289,8 @@ public class StateDirectory {
  27. if (lock(id)) {
  28. final long now = time.milliseconds();
  29. final long lastModifiedMs = taskDir.lastModified();
  30. - if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
  31. + final boolean canClean = committedTasks.contains(id) && now > lastModifiedMs + cleanupDelayMs;
  32. + if (canClean || manualUserCall) {
  33. if (!manualUserCall) {
  34. log.info(
  35. "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
  36. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  37. index 3d974a572..a5df6a7db 100644
  38. --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  39. +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  40. @@ -499,7 +499,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
  41. } catch (final CommitFailedException | ProducerFencedException error) {
  42. throw new TaskMigratedException(this, error);
  43. }
  44. -
  45. + stateDirectory.markTaskCommitted(id);
  46. commitNeeded = false;
  47. commitRequested = false;
  48. taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement