Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Override
- public void init(ProcessorContext context) {
- stateStore = (KeyValueStore<String, IotEvent>) context.getStateStore("eventstore");
- context.schedule(Duration.ofHours(8), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
- //iterate over all records and remove them based on some criteria
- stateStore.all().forEachRemaining(keyValue -> {
- if (keyValue.value.getEventType() == EventType.PROCESSED) {
- stateStore.delete(keyValue.key);
- }
- });
- });
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement