Advertisement
tha-dude

CEPTestWatermarkOverride.java

Oct 11th, 2016
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.63 KB | None | 0 0
  1. package com.mycorp.flink;
  2.  
  3. import java.util.Date;
  4. import java.util.Map;
  5.  
  6. import org.apache.flink.api.common.functions.FlatMapFunction;
  7. import org.apache.flink.cep.CEP;
  8. import org.apache.flink.cep.PatternSelectFunction;
  9. import org.apache.flink.cep.PatternStream;
  10. import org.apache.flink.cep.PatternTimeoutFunction;
  11. import org.apache.flink.cep.pattern.Pattern;
  12. import org.apache.flink.streaming.api.TimeCharacteristic;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  16. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  17. import org.apache.flink.streaming.api.watermark.Watermark;
  18. import org.apache.flink.streaming.api.windowing.time.Time;
  19. import org.apache.flink.types.Either;
  20. import org.apache.flink.util.Collector;
  21.  
  22. /**
  23.  * <p>
  24.  * To start an example socket text stream on your local machine run netcat from a command line:
  25.  * <code>nc -lk 9999</code>, where the parameter specifies the port number.
  26.  */
  27. public class CEPTestWatermarkOverride {
  28.    
  29.     public static void main(String[] args) throws Exception {
  30.  
  31.         if (args.length != 2) {
  32.             System.err.println("USAGE:\nCEPTestWatermarkOverride <hostname> <port>");
  33.             return;
  34.         }
  35.  
  36.         String hostName = args[0];
  37.         Integer port = Integer.parseInt(args[1]);
  38.  
  39.         // Set up the execution environment.
  40.         final StreamExecutionEnvironment env = StreamExecutionEnvironment
  41.                 .getExecutionEnvironment();
  42.        
  43.         //env.getConfig().setAutoWatermarkInterval(10000L);
  44.              
  45.         // Enable to use event time rather than processing time.
  46.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  47.        
  48.         // The pattern.
  49.         Pattern<Event, Event> pattern = Pattern.<Event>begin("start")
  50.                 .where(evt -> evt.key.length() > 0)
  51.                 .next("last").where(evt -> evt.key.length() > 0).within(Time.seconds(5));
  52.  
  53.         // Read lines from socket.
  54.         DataStream<String> text = env.socketTextStream(hostName, port);
  55.  
  56.         // Add tmst, key by input line.
  57.         DataStream<Event> tuples = text.flatMap(new EventExtractor()).keyBy("key");
  58.  
  59.         // Extract tmst, key by input key.
  60.         DataStream<Event> withTimestampsAndWatermarks = tuples
  61.                 .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>() {
  62.                    
  63.                     long waterMarkTmst;
  64.                    
  65.                     @Override
  66.                     public long extractTimestamp(Event element, long previousElementTimestamp) {
  67.                         return element.tmst;
  68.                     }
  69.  
  70.                     @Override
  71.                     public Watermark getCurrentWatermark() {
  72.                         waterMarkTmst = System.currentTimeMillis() - 5000L;
  73.                         System.out.println(String.format("Watermark at %s", new Date(waterMarkTmst)));
  74.                         return new Watermark(waterMarkTmst);
  75.                     }
  76.                 }).keyBy("key");
  77.  
  78.         withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(1000L);
  79.        
  80.         // Apply pattern filtering on stream.
  81.         PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks, pattern);
  82.        
  83.         // Select.
  84.         DataStream<Either<String, String>> result = patternStream.select(
  85.                
  86.                 new PatternTimeoutFunction<Event, String>() {
  87.  
  88.                     @Override
  89.                     public String timeout(Map<String, Event> arg0, long arg1) throws Exception {
  90.                         String out = "Timeout\n";
  91.                         for (Map.Entry<String, Event> e: arg0.entrySet()) {
  92.                             out += String.format("%s:%s\n", e.getKey(), e.getValue());
  93.                         }
  94.                         return out + String.format("at: %s.", new Date(arg1));
  95.                     }
  96.                    
  97.                 },
  98.  
  99.                 new PatternSelectFunction<Event, String>() {
  100.  
  101.                     @Override
  102.                     public String select(Map<String, Event> arg0) throws Exception {
  103.                         String out = "Match\n";
  104.                         for (Map.Entry<String, Event> e: arg0.entrySet()) {
  105.                             out += String.format("%s:%s\n", e.getKey(), e.getValue());
  106.                         }
  107.                         return out;
  108.                     }
  109.                 });
  110.  
  111.         result.print();
  112.        
  113.         env.execute("Flink CEP test.");
  114.     }
  115.    
  116.     /**
  117.      * Just emit (input text, tmst) tuple.
  118.      */
  119.     public static final class EventExtractor implements FlatMapFunction<String, Event> {
  120.  
  121.         @Override
  122.         public void flatMap(String value, Collector<Event> out) {
  123.             String splits[] = value.split(" ");
  124.             out.collect(new Event(splits[0], splits[1]));
  125.         }
  126.     }
  127.    
  128.     /**
  129.      * Event
  130.      */
  131.     public static final class Event {
  132.        
  133.         public String key;
  134.         public String value;
  135.         public long tmst;
  136.        
  137.         public Event() {
  138.         }
  139.        
  140.         public Event(String key, String value) {
  141.             this.key = key;
  142.             this.value = value;
  143.             this.tmst = System.currentTimeMillis();
  144.         }
  145.      
  146.         @Override
  147.         public String toString() {
  148.             return String.format("key: %s, value: %s, tmst: %s", key, value, new Date(tmst));
  149.         }
  150.        
  151.     }
  152.    
  153. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement