Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.mycorp.flink;
- import java.util.Date;
- import java.util.Map;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternSelectFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.PatternTimeoutFunction;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.types.Either;
- import org.apache.flink.util.Collector;
- /**
- * <p>
- * To start an example socket text stream on your local machine run netcat from a command line:
- * <code>nc -lk 9999</code>, where the parameter specifies the port number.
- */
- public class CEPTest {
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("USAGE:\nCEPTest <hostname> <port>");
- return;
- }
- String hostName = args[0];
- Integer port = Integer.parseInt(args[1]);
- // Set up the execution environment.
- final StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment();
- // Enable to use event time rather than processing time.
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // The pattern.
- Pattern<Event, Event> pattern = Pattern.<Event>begin("start")
- .where(evt -> evt.key.length() > 0)
- .next("last").where(evt -> evt.key.length() > 0).within(Time.seconds(3));
- // Read lines from socket.
- DataStream<String> text = env.socketTextStream(hostName, port);
- // Add tmst, key by input line.
- DataStream<Event> tuples = text.flatMap(new EventExtractor()).keyBy("key");
- // Extract tmst, key by input key.
- DataStream<Event> withTimestampsAndWatermarks = tuples
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
- @Override
- public long extractAscendingTimestamp(Event element) {
- return element.tmst;
- }
- }).keyBy("key");
- // Apply pattern filtering on stream.
- PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks, pattern);
- // Select.
- DataStream<Either<String, String>> result = patternStream.select(
- new PatternTimeoutFunction<Event, String>() {
- @Override
- public String timeout(Map<String, Event> arg0, long arg1) throws Exception {
- String out = "Timeout\n";
- for (Map.Entry<String, Event> e: arg0.entrySet()) {
- out += String.format("%s:%s\n", e.getKey(), e.getValue());
- }
- return out + String.format("at: %s.", new Date(arg1));
- }
- },
- new PatternSelectFunction<Event, String>() {
- @Override
- public String select(Map<String, Event> arg0) throws Exception {
- String out = "Match\n";
- for (Map.Entry<String, Event> e: arg0.entrySet()) {
- out += String.format("%s:%s\n", e.getKey(), e.getValue());
- }
- return out;
- }
- });
- result.print();
- env.execute("Flink CEP test.");
- }
- /**
- * Just emit (input text, tmst) tuple.
- */
- public static final class EventExtractor implements FlatMapFunction<String, Event> {
- @Override
- public void flatMap(String value, Collector<Event> out) {
- String splits[] = value.split(" ");
- out.collect(new Event(splits[0], splits[1]));
- }
- }
- /**
- * Event
- */
- public static final class Event {
- public String key;
- public String value;
- public long tmst;
- public Event() {
- }
- public Event(String key, String value) {
- this.key = key;
- this.value = value;
- this.tmst = System.currentTimeMillis();
- }
- @Override
- public String toString() {
- return String.format("key: %s, value: %s, tmst: %s", key, value, new Date(tmst));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement