Advertisement
tha-dude

CEPTest.java

Oct 9th, 2016
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.89 KB | None | 0 0
  1. package com.mycorp.flink;
  2.  
  3. import java.util.Date;
  4. import java.util.Map;
  5.  
  6. import org.apache.flink.api.common.functions.FlatMapFunction;
  7. import org.apache.flink.cep.CEP;
  8. import org.apache.flink.cep.PatternSelectFunction;
  9. import org.apache.flink.cep.PatternStream;
  10. import org.apache.flink.cep.PatternTimeoutFunction;
  11. import org.apache.flink.cep.pattern.Pattern;
  12. import org.apache.flink.streaming.api.TimeCharacteristic;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  16. import org.apache.flink.streaming.api.watermark.Watermark;
  17. import org.apache.flink.streaming.api.windowing.time.Time;
  18. import org.apache.flink.types.Either;
  19. import org.apache.flink.util.Collector;
  20.  
  21. /**
  22.  * <p>
  23.  * To start an example socket text stream on your local machine run netcat from a command line:
  24.  * <code>nc -lk 9999</code>, where the parameter specifies the port number.
  25.  */
  26. public class CEPTest {
  27.    
  28.     public static void main(String[] args) throws Exception {
  29.  
  30.         if (args.length != 2) {
  31.             System.err.println("USAGE:\nCEPTest <hostname> <port>");
  32.             return;
  33.         }
  34.  
  35.         String hostName = args[0];
  36.         Integer port = Integer.parseInt(args[1]);
  37.  
  38.         // Set up the execution environment.
  39.         final StreamExecutionEnvironment env = StreamExecutionEnvironment
  40.                 .getExecutionEnvironment();
  41.        
  42.         // Enable to use event time rather than processing time.
  43.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  44.        
  45.         // The pattern.
  46.         Pattern<Event, Event> pattern = Pattern.<Event>begin("start")
  47.                 .where(evt -> evt.key.length() > 0)
  48.                 .next("last").where(evt -> evt.key.length() > 0).within(Time.seconds(3));
  49.  
  50.         // Read lines from socket.
  51.         DataStream<String> text = env.socketTextStream(hostName, port);
  52.  
  53.         // Add tmst, key by input line.
  54.         DataStream<Event> tuples = text.flatMap(new EventExtractor()).keyBy("key");
  55.  
  56.         // Extract tmst, key by input key.
  57.         DataStream<Event> withTimestampsAndWatermarks = tuples
  58.                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
  59.  
  60.                     @Override
  61.                     public long extractAscendingTimestamp(Event element) {
  62.                         return element.tmst;
  63.                     }
  64.                 }).keyBy("key");
  65.  
  66.         // Apply pattern filtering on stream.
  67.         PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks, pattern);
  68.        
  69.         // Select.
  70.         DataStream<Either<String, String>> result = patternStream.select(
  71.                
  72.                 new PatternTimeoutFunction<Event, String>() {
  73.  
  74.                     @Override
  75.                     public String timeout(Map<String, Event> arg0, long arg1) throws Exception {
  76.                         String out = "Timeout\n";
  77.                         for (Map.Entry<String, Event> e: arg0.entrySet()) {
  78.                             out += String.format("%s:%s\n", e.getKey(), e.getValue());
  79.                         }
  80.                         return out + String.format("at: %s.", new Date(arg1));
  81.                     }
  82.                    
  83.                 },
  84.  
  85.                 new PatternSelectFunction<Event, String>() {
  86.  
  87.                     @Override
  88.                     public String select(Map<String, Event> arg0) throws Exception {
  89.                         String out = "Match\n";
  90.                         for (Map.Entry<String, Event> e: arg0.entrySet()) {
  91.                             out += String.format("%s:%s\n", e.getKey(), e.getValue());
  92.                         }
  93.                         return out;
  94.                     }
  95.                 });
  96.  
  97.         result.print();
  98.        
  99.         env.execute("Flink CEP test.");
  100.     }
  101.    
  102.     /**
  103.      * Just emit (input text, tmst) tuple.
  104.      */
  105.     public static final class EventExtractor implements FlatMapFunction<String, Event> {
  106.  
  107.         @Override
  108.         public void flatMap(String value, Collector<Event> out) {
  109.             String splits[] = value.split(" ");
  110.             out.collect(new Event(splits[0], splits[1]));
  111.         }
  112.     }
  113.    
  114.     /**
  115.      * Event
  116.      */
  117.     public static final class Event {
  118.        
  119.         public String key;
  120.         public String value;
  121.         public long tmst;
  122.        
  123.         public Event() {
  124.         }
  125.        
  126.         public Event(String key, String value) {
  127.             this.key = key;
  128.             this.value = value;
  129.             this.tmst = System.currentTimeMillis();
  130.         }
  131.      
  132.         @Override
  133.         public String toString() {
  134.             return String.format("key: %s, value: %s, tmst: %s", key, value, new Date(tmst));
  135.         }
  136.        
  137.     }
  138.    
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement