Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.wso2.siddhi.core.event.ComplexEvent;
- import org.wso2.siddhi.core.event.ComplexEventChunk;
- import org.wso2.siddhi.core.event.Event;
- import org.wso2.siddhi.core.event.stream.StreamEvent;
- import org.wso2.siddhi.core.query.output.callback.QueryCallback;
- import java.util.Arrays;
- public abstract class CustomQueryCallback extends QueryCallback {
- private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class);
- public void receiveStreamEvent(ComplexEventChunk complexEventChunk) {
- while (complexEventChunk.hasNext()) {
- ComplexEvent streamEvent = complexEventChunk.next();
- Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent);
- Event[] events = new Event[]{event};
- long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]);
- if (streamEvent.getType() == StreamEvent.Type.EXPIRED){
- send(timestamp, null, events);
- } else {
- send(timestamp, events, null);
- }
- }
- }
- private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) {
- try {
- receive(timeStamp, currentEvents, expiredEvents);
- } catch (RuntimeException e) {
- log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement