Advertisement
Guest User

Untitled

a guest
Aug 2nd, 2015
221
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.52 KB | None | 0 0
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.wso2.siddhi.core.event.ComplexEvent;
  4. import org.wso2.siddhi.core.event.ComplexEventChunk;
  5. import org.wso2.siddhi.core.event.Event;
  6. import org.wso2.siddhi.core.event.stream.StreamEvent;
  7. import org.wso2.siddhi.core.query.output.callback.QueryCallback;
  8. import java.util.Arrays;
  9.  
  10. public abstract class CustomQueryCallback extends QueryCallback {
  11.  
  12. private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class);
  13.  
  14. public void receiveStreamEvent(ComplexEventChunk complexEventChunk) {
  15. while (complexEventChunk.hasNext()) {
  16. ComplexEvent streamEvent = complexEventChunk.next();
  17. Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent);
  18. Event[] events = new Event[]{event};
  19. long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]);
  20. if (streamEvent.getType() == StreamEvent.Type.EXPIRED){
  21. send(timestamp, null, events);
  22. } else {
  23. send(timestamp, events, null);
  24. }
  25. }
  26. }
  27.  
  28. private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) {
  29. try {
  30. receive(timeStamp, currentEvents, expiredEvents);
  31. } catch (RuntimeException e) {
  32. log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e);
  33. }
  34. }
  35.  
  36. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement