Advertisement
Guest User

Untitled

a guest
Nov 20th, 2018
135
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 18.79 KB | None | 0 0
  1. package ctic.smartcity.savia;
  2.  
  3. import ctic.smartcity.savia.Data.Agresor;
  4. import ctic.smartcity.savia.Data.Alarma;
  5. import ctic.smartcity.savia.Data.Comparador;
  6. import ctic.smartcity.savia.Data.Victima;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8.  
  9. import org.apache.flink.api.common.functions.Partitioner;
  10. import org.apache.flink.api.common.state.ValueState;
  11. import org.apache.flink.api.common.state.ValueStateDescriptor;
  12. import org.apache.flink.api.common.typeinfo.TypeHint;
  13. import org.apache.flink.api.common.typeinfo.TypeInformation;
  14. import org.apache.flink.api.java.functions.KeySelector;
  15. import org.apache.flink.cep.CEP;
  16. import org.apache.flink.cep.PatternSelectFunction;
  17. import org.apache.flink.cep.PatternStream;
  18. import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
  19. import org.apache.flink.cep.pattern.Pattern;
  20. import org.apache.flink.cep.pattern.conditions.IterativeCondition;
  21.  
  22. import org.apache.flink.configuration.Configuration;
  23. import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
  24. import org.apache.flink.streaming.api.TimerService;
  25. import org.apache.flink.streaming.api.datastream.DataStream;
  26. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  27.  
  28. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  29. import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
  30. import org.apache.flink.streaming.api.functions.ProcessFunction;
  31. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  32. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  33. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  34. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  35. import org.apache.flink.streaming.api.watermark.Watermark;
  36. import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
  37. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  38. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  39. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  40. import org.apache.flink.streaming.api.windowing.time.Time;
  41. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  42. import org.apache.flink.util.Collector;
  43. import org.eclipse.paho.client.mqttv3.*;
  44. import org.json.JSONException;
  45. import org.json.JSONObject;
  46.  
  47.  
  48. import javax.annotation.Nullable;
  49. import java.net.ServerSocket;
  50. import java.sql.Timestamp;
  51. import java.text.DateFormat;
  52. import java.text.ParseException;
  53. import java.text.SimpleDateFormat;
  54. import java.util.*;
  55.  
  56. import static java.lang.Thread.sleep;
  57.  
  58. public class CEPMonitor {
  59.  
  60.     public static final int timeSentSec = 60;
  61.  
  62.     public static void main(String[] args) throws Exception {
  63.         final StreamExecutionEnvironment env = StreamExecutionEnvironment
  64.                 .getExecutionEnvironment()
  65.                 ;
  66.  
  67.         DataStream<PosicionEvent> sensadoInput0 = env.addSource(new MiMQTTSource())
  68.                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<PosicionEvent>() {
  69.                     @Override
  70.                     public long extractAscendingTimestamp(PosicionEvent element) {
  71.                         return StringDatetoLong(element.getDate());
  72.                     }
  73.  
  74.                 })
  75.                 .keyBy(new KeySelector<PosicionEvent, String>() {
  76.                     @Override
  77.                     public String getKey(PosicionEvent posicionEvent) throws Exception {
  78.                         return posicionEvent.getRelacion();
  79.                     }
  80.                 })
  81.                 ;
  82.  
  83.         Pattern<PosicionEvent,?> alarmaClose = Pattern.<PosicionEvent>
  84.                 begin("FirstSensadoEvent")
  85.                 .followedBy("SecondSensadoEvent").where(
  86.                         new IterativeCondition<PosicionEvent>(){
  87.                             @Override
  88.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  89.                                 String typePreviuos="";
  90.                                 for (PosicionEvent mposicionevent : context.getEventsForPattern("FirstSensadoEvent")){
  91.                                     typePreviuos= mposicionevent.getType();
  92.                                     if((typePreviuos.equals("Victima") && posicionEvent.getType().equals("Agresor"))
  93.                                             || (typePreviuos.equals("Agresor") && posicionEvent.getType().equals("Victima"))){
  94.                                         if(Utils.areClose(mposicionevent.getLatitude(),mposicionevent.getLongitude(),
  95.                                                 posicionEvent.getLatitude(),posicionEvent.getLongitude())){
  96.                                             return true;
  97.                                         }
  98.                                     }
  99.                                     return false;
  100.                                 }
  101.                                 return false;
  102.                             }
  103.                         }
  104.                 ).within(Time.seconds(timeSentSec*3/2));
  105.  
  106.         PatternStream<PosicionEvent> patternStreamClose = CEP.pattern(sensadoInput0, alarmaClose);
  107.         DataStream<Alarma> alertsClose = patternStreamClose.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  108.             @Override
  109.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  110.                 PosicionEvent posicionEvent = map.get("FirstSensadoEvent").get(0);
  111.                 PosicionEvent posicionEvent1 = map.get("SecondSensadoEvent").get(0);
  112.                 //addcounter();
  113.                 if(posicionEvent.getType().equals("Victima"))
  114.                     return new Alarma(posicionEvent,posicionEvent1);
  115.                 else return new Alarma(posicionEvent1,posicionEvent);
  116.             }
  117.         });
  118. /*
  119.         Pattern<PosicionEvent,?> alarmaPrevencion = Pattern.<PosicionEvent>
  120.                 begin("begin").times(3)//.allowCombinations()//.consecutive()
  121.                 .followedBy("followed").where(
  122.                         new IterativeCondition <PosicionEvent>(){
  123.                             @Override
  124.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  125.                                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  126.                                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  127.                                 for (PosicionEvent posicionEvent1 : context.getEventsForPattern("begin")) {
  128.                                     if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  129.                                     else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  130.                                 }
  131.                                 if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  132.                                 else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  133.  
  134.                                 if(victimaList.size()==2 && agresorList.size()==2) {
  135.                                     if (Utils.DetectandPrevent(
  136.                                             victimaList.get(0).getLatitude(), victimaList.get(0).getLongitude(),
  137.                                             victimaList.get(1).getLatitude(), victimaList.get(1).getLongitude(),
  138.                                             agresorList.get(0).getLatitude(), agresorList.get(0).getLongitude(),
  139.                                             agresorList.get(1).getLatitude(), agresorList.get(1).getLongitude())) {
  140.                                         return true;
  141.                                     }
  142.                                 }
  143.                                 return false;
  144.                             }
  145.                         }).within(Time.seconds(timeSentSec*5/2));
  146.  
  147.         PatternStream<PosicionEvent> patternStreamAlarm = CEP.pattern(sensadoInput0, alarmaPrevencion);
  148.         DataStream<Alarma> alertaAlarma = patternStreamAlarm.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  149.             @Override
  150.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  151.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  152.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  153.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("begin").listIterator())) {
  154.                     if (positionEvent instanceof Agresor) agresorList.add(positionEvent);
  155.                     else if (positionEvent instanceof Victima) victimaList.add(positionEvent);
  156.                 }
  157.                 PosicionEvent posicionEvent = map.get("followed").get(0);
  158.                 if(posicionEvent instanceof Agresor) agresorList.add(posicionEvent);
  159.                 else if(posicionEvent instanceof Victima) victimaList.add(posicionEvent);
  160.                 return new Alarma(victimaList,agresorList);
  161.             }
  162.         });
  163.  
  164.  
  165.         Pattern<PosicionEvent,?> alarma3 = Pattern.<PosicionEvent>
  166. //                begin("PrimerosSensados",AfterMatchSkipStrategy.skipPastLastEvent())
  167.                 begin("PrimerosSensados",AfterMatchSkipStrategy.skipToLast("PrimerosSensados"))
  168.                 .times(7).consecutive()//
  169.                 .next("SegundoSensado").where(new IterativeCondition<PosicionEvent>() {
  170.                     @Override
  171.                     public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  172.                         ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  173.                         ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  174.                         for (PosicionEvent posicionEvent1 : context.getEventsForPattern("PrimerosSensados")) {
  175.                             if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  176.                             else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  177.                         }
  178.                         if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  179.                         else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  180.  
  181.                         if(victimaList.size()>1 && agresorList.size()>1){
  182.                             int sizeVictim = victimaList.size();
  183.                             int sizeAgresor = agresorList.size();
  184.                             for(int i=sizeAgresor-2;i>-1;i--)
  185.                                 if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  186.                                         agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  187.                                     return true;
  188.                                 }
  189.                             for(int i=sizeVictim-2;i>-1;i--)
  190.                                 if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  191.                                         agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  192.                                     return true;
  193.                                 }
  194.                         }
  195.                         return false;
  196.                     }
  197.                 })
  198.                 .within(Time.seconds(timeSentSec*7/2));
  199.  
  200.         PatternStream<PosicionEvent> patternStreamAlarm3 = CEP.pattern(sensadoInput0, alarma3);
  201.         DataStream<Alarma> alertaAlarma3 = patternStreamAlarm3.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  202.             @Override
  203.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  204.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  205.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  206.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("PrimerosSensados").listIterator()))
  207.                     if(positionEvent.getType().equals("Agresor")) agresorList.add(positionEvent);
  208.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  209.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("SegundoSensado").listIterator()))
  210.                     if(positionEvent instanceof Agresor) agresorList.add(positionEvent);
  211.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  212.  
  213.                 int sizeVictim = victimaList.size();
  214.                 int sizeAgresor = agresorList.size();
  215.                 for(int i=sizeAgresor-2;i>-1;i--)
  216.                     if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  217.                             agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  218.                         return new Alarma(victimaList.get(sizeVictim-1),agresorList.get(sizeAgresor-1),agresorList.get(i),sizeVictim+"_"+sizeAgresor);
  219.                     }
  220.                 for(int i=sizeVictim-2;i>-1;i--)
  221.                     if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  222.                             agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  223.                         return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1),victimaList.get(i),sizeVictim+"_"+sizeAgresor);
  224.                     }
  225.                 System.out.println("ERROR NO HAY SEGUIMIENTO...");
  226.  
  227.                 return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1));
  228.             }
  229.         });
  230. */
  231.         env.execute("GioCEP for Savia - Smartcity ");
  232.  
  233.     }
  234.  
  235.     public static void printArraySEs(ArrayList<PosicionEvent> victimaList,ArrayList<PosicionEvent> agresorList, int seg, int v,int a){
  236.         String ele="";
  237.         for(int i=0;i<victimaList.size();i++)
  238.             ele +=victimaList.get(i).getDate()+" , ";
  239.         System.out.println("Vic"+seg+":"+victimaList.size());
  240.         ele="";
  241.         for(int i=0;i<agresorList.size();i++)
  242.             ele +=agresorList.get(i).getDate()+" , ";
  243.         System.out.println("Agre"+seg+":"+agresorList.size());
  244.         System.out.println(victimaList.get(v).getDate()+"_"+agresorList.get(a).getDate());
  245.     }
  246.  
  247.     public static final class MisElementos implements MapFunction<String,PosicionEvent> {
  248.         @Override
  249.         public PosicionEvent map(String contentAsString) throws Exception {
  250.             try {
  251.                 JSONObject jsonObject = new JSONObject(contentAsString);
  252.                 String type =jsonObject.getString("type");
  253.                 if (type.equals("Agresor"))             return new Agresor(jsonObject);
  254.                 else if(type.equals("Victima"))       return new Victima(jsonObject);
  255.  
  256.                 else return new PosicionEvent();
  257.             } catch (JSONException e) {
  258.                 System.out.println("No es JSON:"+contentAsString+"_"+e.toString() );
  259.                 //  +"counterAlarm:"+counter+"_counterMQTT:"+counterMQTT+"_counterSocket:");
  260.                 return new PosicionEvent();
  261.             }
  262.         }
  263.     }
  264.  
  265.     public static class MiMQTTSource implements SourceFunction<PosicionEvent> {
  266.         private String user = "CTIC-SMARTCITY";
  267.         private String password = "YTICTRAMS-CITC";
  268.         private volatile boolean isRunning = true;
  269.         @Override
  270.         public void run(SourceContext<PosicionEvent> ctx) throws Exception {
  271.             MqttClient client;
  272.             try {
  273.                 client = new MqttClient("tcp://localhost", MqttClient.generateClientId());
  274.                 MqttConnectOptions options = new MqttConnectOptions();
  275.                 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
  276.                 options.setUserName(user);
  277.                 options.setPassword(password.toCharArray());
  278.                 options.setCleanSession(false);
  279.                 options.setAutomaticReconnect(true);
  280.                 client.setCallback(new MqttCallback() {
  281.                     @Override
  282.                     public void connectionLost(Throwable throwable) {
  283.                         System.out.println("Desconectado MQTT del Socket! "+new SimpleDateFormat("HH:mm:ss").format(new Date()));
  284.                     }
  285.                     @Override
  286.                     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  287.                         System.out.println("topic: "+topic);
  288.                         String re_type[] = topic.split("/");
  289.                         try {
  290.                             JSONObject jsonObject = new JSONObject(new String(mqttMessage.getPayload()));
  291.                             if (re_type[1].equals("Agresor")){
  292.                                 ctx.collect(new Agresor(jsonObject,re_type[0],re_type[1]));
  293.                             }
  294.                             else if (re_type[1].equals("Victima")){
  295.                                 ctx.collect(new Victima(jsonObject,re_type[0],re_type[1]));
  296.                             }
  297.                         }catch (JSONException e) {
  298.                             System.out.println("No es JSON:"+new String(mqttMessage.getPayload())+"_"+e.toString() );
  299.                             //+"counterSegu:"+ParejasSeguimiento+"_counterMQTT:"+counterMQTT);
  300.                         }
  301.                     }
  302.  
  303.                     @Override
  304.                     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  305.  
  306.                     }
  307.                 });
  308.                 client.connect(options);
  309.                 client.subscribe("+/+");
  310.             } catch (MqttException e) {
  311.                 System.out.println("Expcecion hilo "+e.toString());
  312.             }
  313.             while (!Thread.currentThread().isInterrupted() && isRunning) {
  314.                 try {
  315.                     sleep(Integer.MAX_VALUE);
  316.                 }catch (InterruptedException ex) {
  317.                     Thread.currentThread().interrupt();
  318.                 }
  319.             }
  320.         }
  321.  
  322.         @Override
  323.         public void cancel() {
  324.             isRunning = false;
  325.         }
  326.     }
  327.  
  328.     public static long StringDatetoLong(String mdate){
  329.         Date date=new Date();
  330.         try {                                 //2018-04-13 18:28:43
  331.             date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(mdate);
  332.         } catch (ParseException e) {
  333.             System.out.println("Error parsing DATE "+e.toString());
  334.             //e.printStackTrace();
  335.         }
  336.         Timestamp timestamp = new Timestamp(date.getTime());
  337.         return timestamp.getTime();
  338.     }
  339. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement