Advertisement
Guest User

Untitled

a guest
Jul 20th, 2019
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.55 KB | None | 0 0
  1. @Override
  2. public void init(ProcessorContext context) {
  3. stateStore = (KeyValueStore<String, IotEvent>) context.getStateStore("eventstore");
  4. context.schedule(Duration.ofHours(8), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
  5. //iterate over all records and remove them based on some criteria
  6. stateStore.all().forEachRemaining(keyValue -> {
  7. if (keyValue.value.getEventType() == EventType.PROCESSED) {
  8. stateStore.delete(keyValue.key);
  9. }
  10. });
  11. });
  12. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement