Advertisement
Guest User

Untitled

a guest
Nov 20th, 2018
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 37.58 KB | None | 0 0
  1. package ctic.smartcity.savia;
  2.  
  3. import ctic.smartcity.savia.Data.Agresor;
  4. import ctic.smartcity.savia.Data.Alarma;
  5. import ctic.smartcity.savia.Data.Comparador;
  6. import ctic.smartcity.savia.Data.Victima;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8.  
  9. import org.apache.flink.api.common.functions.Partitioner;
  10. import org.apache.flink.api.common.state.ValueState;
  11. import org.apache.flink.api.common.state.ValueStateDescriptor;
  12. import org.apache.flink.api.common.typeinfo.TypeHint;
  13. import org.apache.flink.api.common.typeinfo.TypeInformation;
  14. import org.apache.flink.api.java.functions.KeySelector;
  15. import org.apache.flink.cep.CEP;
  16. import org.apache.flink.cep.PatternSelectFunction;
  17. import org.apache.flink.cep.PatternStream;
  18. import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
  19. import org.apache.flink.cep.pattern.Pattern;
  20. import org.apache.flink.cep.pattern.conditions.IterativeCondition;
  21.  
  22. import org.apache.flink.configuration.Configuration;
  23. import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
  24. import org.apache.flink.streaming.api.TimerService;
  25. import org.apache.flink.streaming.api.datastream.DataStream;
  26. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  27.  
  28. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;package ctic.smartcity.savia;
  29.  
  30. import ctic.smartcity.savia.Data.Agresor;
  31. import ctic.smartcity.savia.Data.Alarma;
  32. import ctic.smartcity.savia.Data.Comparador;
  33. import ctic.smartcity.savia.Data.Victima;
  34. import org.apache.flink.api.common.functions.MapFunction;
  35.  
  36. import org.apache.flink.api.common.functions.Partitioner;
  37. import org.apache.flink.api.common.state.ValueState;
  38. import org.apache.flink.api.common.state.ValueStateDescriptor;
  39. import org.apache.flink.api.common.typeinfo.TypeHint;
  40. import org.apache.flink.api.common.typeinfo.TypeInformation;
  41. import org.apache.flink.api.java.functions.KeySelector;
  42. import org.apache.flink.cep.CEP;
  43. import org.apache.flink.cep.PatternSelectFunction;
  44. import org.apache.flink.cep.PatternStream;
  45. import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
  46. import org.apache.flink.cep.pattern.Pattern;
  47. import org.apache.flink.cep.pattern.conditions.IterativeCondition;
  48.  
  49. import org.apache.flink.configuration.Configuration;
  50. import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
  51. import org.apache.flink.streaming.api.TimerService;
  52. import org.apache.flink.streaming.api.datastream.DataStream;
  53. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  54.  
  55. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  56. import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
  57. import org.apache.flink.streaming.api.functions.ProcessFunction;
  58. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  59. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  60. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  61. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  62. import org.apache.flink.streaming.api.watermark.Watermark;
  63. import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
  64. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  65. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  66. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  67. import org.apache.flink.streaming.api.windowing.time.Time;
  68. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  69. import org.apache.flink.util.Collector;
  70. import org.eclipse.paho.client.mqttv3.*;
  71. import org.json.JSONException;
  72. import org.json.JSONObject;
  73.  
  74.  
  75. import javax.annotation.Nullable;
  76. import java.net.ServerSocket;
  77. import java.sql.Timestamp;
  78. import java.text.DateFormat;
  79. import java.text.ParseException;
  80. import java.text.SimpleDateFormat;
  81. import java.util.*;
  82.  
  83. import static java.lang.Thread.sleep;
  84.  
  85. public class CEPMonitor {
  86.  
  87.     public static final int timeSentSec = 60;
  88.  
  89.     public static void main(String[] args) throws Exception {
  90.         final StreamExecutionEnvironment env = StreamExecutionEnvironment
  91.                 .getExecutionEnvironment()
  92.                 ;
  93.  
  94.         DataStream<PosicionEvent> sensadoInput0 = env.addSource(new MiMQTTSource())
  95.                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<PosicionEvent>() {
  96.                     @Override
  97.                     public long extractAscendingTimestamp(PosicionEvent element) {
  98.                         return StringDatetoLong(element.getDate());
  99.                     }
  100.  
  101.                 })
  102.                 .keyBy(new KeySelector<PosicionEvent, String>() {
  103.                     @Override
  104.                     public String getKey(PosicionEvent posicionEvent) throws Exception {
  105.                         return posicionEvent.getRelacion();
  106.                     }
  107.                 })
  108.                 ;
  109.  
  110.         Pattern<PosicionEvent,?> alarmaClose = Pattern.<PosicionEvent>
  111.                 begin("FirstSensadoEvent")
  112.                 .followedBy("SecondSensadoEvent").where(
  113.                         new IterativeCondition<PosicionEvent>(){
  114.                             @Override
  115.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  116.                                 String typePreviuos="";
  117.                                 for (PosicionEvent mposicionevent : context.getEventsForPattern("FirstSensadoEvent")){
  118.                                     typePreviuos= mposicionevent.getType();
  119.                                     if((typePreviuos.equals("Victima") && posicionEvent.getType().equals("Agresor"))
  120.                                             || (typePreviuos.equals("Agresor") && posicionEvent.getType().equals("Victima"))){
  121.                                         if(Utils.areClose(mposicionevent.getLatitude(),mposicionevent.getLongitude(),
  122.                                                 posicionEvent.getLatitude(),posicionEvent.getLongitude())){
  123.                                             return true;
  124.                                         }
  125.                                     }
  126.                                     return false;
  127.                                 }
  128.                                 return false;
  129.                             }
  130.                         }
  131.                 ).within(Time.seconds(timeSentSec*3/2));
  132.  
  133.         PatternStream<PosicionEvent> patternStreamClose = CEP.pattern(sensadoInput0, alarmaClose);
  134.         DataStream<Alarma> alertsClose = patternStreamClose.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  135.             @Override
  136.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  137.                 PosicionEvent posicionEvent = map.get("FirstSensadoEvent").get(0);
  138.                 PosicionEvent posicionEvent1 = map.get("SecondSensadoEvent").get(0);
  139.                 //addcounter();
  140.                 if(posicionEvent.getType().equals("Victima"))
  141.                     return new Alarma(posicionEvent,posicionEvent1);
  142.                 else return new Alarma(posicionEvent1,posicionEvent);
  143.             }
  144.         });
  145. /*
  146.         Pattern<PosicionEvent,?> alarmaPrevencion = Pattern.<PosicionEvent>
  147.                 begin("begin").times(3)//.allowCombinations()//.consecutive()
  148.                 .followedBy("followed").where(
  149.                         new IterativeCondition <PosicionEvent>(){
  150.                             @Override
  151.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  152.                                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  153.                                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  154.                                 for (PosicionEvent posicionEvent1 : context.getEventsForPattern("begin")) {
  155.                                     if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  156.                                     else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  157.                                 }
  158.                                 if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  159.                                 else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  160.  
  161.                                 if(victimaList.size()==2 && agresorList.size()==2) {
  162.                                     if (Utils.DetectandPrevent(
  163.                                             victimaList.get(0).getLatitude(), victimaList.get(0).getLongitude(),
  164.                                             victimaList.get(1).getLatitude(), victimaList.get(1).getLongitude(),
  165.                                             agresorList.get(0).getLatitude(), agresorList.get(0).getLongitude(),
  166.                                             agresorList.get(1).getLatitude(), agresorList.get(1).getLongitude())) {
  167.                                         return true;
  168.                                     }
  169.                                 }
  170.                                 return false;
  171.                             }
  172.                         }).within(Time.seconds(timeSentSec*5/2));
  173.  
  174.         PatternStream<PosicionEvent> patternStreamAlarm = CEP.pattern(sensadoInput0, alarmaPrevencion);
  175.         DataStream<Alarma> alertaAlarma = patternStreamAlarm.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  176.             @Override
  177.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  178.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  179.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  180.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("begin").listIterator())) {
  181.                     if (positionEvent instanceof Agresor) agresorList.add(positionEvent);
  182.                     else if (positionEvent instanceof Victima) victimaList.add(positionEvent);
  183.                 }
  184.                 PosicionEvent posicionEvent = map.get("followed").get(0);
  185.                 if(posicionEvent instanceof Agresor) agresorList.add(posicionEvent);
  186.                 else if(posicionEvent instanceof Victima) victimaList.add(posicionEvent);
  187.                 return new Alarma(victimaList,agresorList);
  188.             }
  189.         });
  190.  
  191.  
  192.         Pattern<PosicionEvent,?> alarma3 = Pattern.<PosicionEvent>
  193. //                begin("PrimerosSensados",AfterMatchSkipStrategy.skipPastLastEvent())
  194.                 begin("PrimerosSensados",AfterMatchSkipStrategy.skipToLast("PrimerosSensados"))
  195.                 .times(7).consecutive()//
  196.                 .next("SegundoSensado").where(new IterativeCondition<PosicionEvent>() {
  197.                     @Override
  198.                     public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  199.                         ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  200.                         ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  201.                         for (PosicionEvent posicionEvent1 : context.getEventsForPattern("PrimerosSensados")) {
  202.                             if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  203.                             else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  204.                         }
  205.                         if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  206.                         else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  207.  
  208.                         if(victimaList.size()>1 && agresorList.size()>1){
  209.                             int sizeVictim = victimaList.size();
  210.                             int sizeAgresor = agresorList.size();
  211.                             for(int i=sizeAgresor-2;i>-1;i--)
  212.                                 if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  213.                                         agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  214.                                     return true;
  215.                                 }
  216.                             for(int i=sizeVictim-2;i>-1;i--)
  217.                                 if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  218.                                         agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  219.                                     return true;
  220.                                 }
  221.                         }
  222.                         return false;
  223.                     }
  224.                 })
  225.                 .within(Time.seconds(timeSentSec*7/2));
  226.  
  227.         PatternStream<PosicionEvent> patternStreamAlarm3 = CEP.pattern(sensadoInput0, alarma3);
  228.         DataStream<Alarma> alertaAlarma3 = patternStreamAlarm3.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  229.             @Override
  230.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  231.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  232.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  233.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("PrimerosSensados").listIterator()))
  234.                     if(positionEvent.getType().equals("Agresor")) agresorList.add(positionEvent);
  235.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  236.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("SegundoSensado").listIterator()))
  237.                     if(positionEvent instanceof Agresor) agresorList.add(positionEvent);
  238.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  239.  
  240.                 int sizeVictim = victimaList.size();
  241.                 int sizeAgresor = agresorList.size();
  242.                 for(int i=sizeAgresor-2;i>-1;i--)
  243.                     if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  244.                             agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  245.                         return new Alarma(victimaList.get(sizeVictim-1),agresorList.get(sizeAgresor-1),agresorList.get(i),sizeVictim+"_"+sizeAgresor);
  246.                     }
  247.                 for(int i=sizeVictim-2;i>-1;i--)
  248.                     if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  249.                             agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  250.                         return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1),victimaList.get(i),sizeVictim+"_"+sizeAgresor);
  251.                     }
  252.                 System.out.println("ERROR NO HAY SEGUIMIENTO...");
  253.  
  254.                 return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1));
  255.             }
  256.         });
  257. */
  258.         env.execute("GioCEP for Savia - Smartcity ");
  259.  
  260.     }
  261.  
  262.     public static void printArraySEs(ArrayList<PosicionEvent> victimaList,ArrayList<PosicionEvent> agresorList, int seg, int v,int a){
  263.         String ele="";
  264.         for(int i=0;i<victimaList.size();i++)
  265.             ele +=victimaList.get(i).getDate()+" , ";
  266.         System.out.println("Vic"+seg+":"+victimaList.size());
  267.         ele="";
  268.         for(int i=0;i<agresorList.size();i++)
  269.             ele +=agresorList.get(i).getDate()+" , ";
  270.         System.out.println("Agre"+seg+":"+agresorList.size());
  271.         System.out.println(victimaList.get(v).getDate()+"_"+agresorList.get(a).getDate());
  272.     }
  273.  
  274.     public static final class MisElementos implements MapFunction<String,PosicionEvent> {
  275.         @Override
  276.         public PosicionEvent map(String contentAsString) throws Exception {
  277.             try {
  278.                 JSONObject jsonObject = new JSONObject(contentAsString);
  279.                 String type =jsonObject.getString("type");
  280.                 if (type.equals("Agresor"))             return new Agresor(jsonObject);
  281.                 else if(type.equals("Victima"))       return new Victima(jsonObject);
  282.  
  283.                 else return new PosicionEvent();
  284.             } catch (JSONException e) {
  285.                 System.out.println("No es JSON:"+contentAsString+"_"+e.toString() );
  286.                 //  +"counterAlarm:"+counter+"_counterMQTT:"+counterMQTT+"_counterSocket:");
  287.                 return new PosicionEvent();
  288.             }
  289.         }
  290.     }
  291.  
  292.     public static class MiMQTTSource implements SourceFunction<PosicionEvent> {
  293.         private String user = "CTIC-SMARTCITY";
  294.         private String password = "YTICTRAMS-CITC";
  295.         private volatile boolean isRunning = true;
  296.         @Override
  297.         public void run(SourceContext<PosicionEvent> ctx) throws Exception {
  298.             MqttClient client;
  299.             try {
  300.                 client = new MqttClient("tcp://localhost", MqttClient.generateClientId());
  301.                 MqttConnectOptions options = new MqttConnectOptions();
  302.                 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
  303.                 options.setUserName(user);
  304.                 options.setPassword(password.toCharArray());
  305.                 options.setCleanSession(false);
  306.                 options.setAutomaticReconnect(true);
  307.                 client.setCallback(new MqttCallback() {
  308.                     @Override
  309.                     public void connectionLost(Throwable throwable) {
  310.                         System.out.println("Desconectado MQTT del Socket! "+new SimpleDateFormat("HH:mm:ss").format(new Date()));
  311.                     }
  312.                     @Override
  313.                     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  314.                         System.out.println("topic: "+topic);
  315.                         String re_type[] = topic.split("/");
  316.                         try {
  317.                             JSONObject jsonObject = new JSONObject(new String(mqttMessage.getPayload()));
  318.                             if (re_type[1].equals("Agresor")){
  319.                                 ctx.collect(new Agresor(jsonObject,re_type[0],re_type[1]));
  320.                             }
  321.                             else if (re_type[1].equals("Victima")){
  322.                                 ctx.collect(new Victima(jsonObject,re_type[0],re_type[1]));
  323.                             }
  324.                         }catch (JSONException e) {
  325.                             System.out.println("No es JSON:"+new String(mqttMessage.getPayload())+"_"+e.toString() );
  326.                             //+"counterSegu:"+ParejasSeguimiento+"_counterMQTT:"+counterMQTT);
  327.                         }
  328.                     }
  329.  
  330.                     @Override
  331.                     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  332.  
  333.                     }
  334.                 });
  335.                 client.connect(options);
  336.                 client.subscribe("+/+");
  337.             } catch (MqttException e) {
  338.                 System.out.println("Expcecion hilo "+e.toString());
  339.             }
  340.             while (!Thread.currentThread().isInterrupted() && isRunning) {
  341.                 try {
  342.                     sleep(Integer.MAX_VALUE);
  343.                 }catch (InterruptedException ex) {
  344.                     Thread.currentThread().interrupt();
  345.                 }
  346.             }
  347.         }
  348.  
  349.         @Override
  350.         public void cancel() {
  351.             isRunning = false;
  352.         }
  353.     }
  354.  
  355.     public static long StringDatetoLong(String mdate){
  356.         Date date=new Date();
  357.         try {                                 //2018-04-13 18:28:43
  358.             date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(mdate);
  359.         } catch (ParseException e) {
  360.             System.out.println("Error parsing DATE "+e.toString());
  361.             //e.printStackTrace();
  362.         }
  363.         Timestamp timestamp = new Timestamp(date.getTime());
  364.         return timestamp.getTime();
  365.     }
  366. }
  367. import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
  368. import org.apache.flink.streaming.api.functions.ProcessFunction;
  369. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  370. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  371. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  372. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  373. import org.apache.flink.streaming.api.watermark.Watermark;
  374. import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
  375. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  376. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  377. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  378. import org.apache.flink.streaming.api.windowing.time.Time;
  379. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  380. import org.apache.flink.util.Collector;
  381. import org.eclipse.paho.client.mqttv3.*;
  382. import org.json.JSONException;
  383. import org.json.JSONObject;
  384.  
  385.  
  386. import javax.annotation.Nullable;
  387. import java.net.ServerSocket;
  388. import java.sql.Timestamp;
  389. import java.text.DateFormat;
  390. import java.text.ParseException;
  391. import java.text.SimpleDateFormat;
  392. import java.util.*;
  393.  
  394. import static java.lang.Thread.sleep;
  395.  
  396. public class CEPMonitor {
  397.  
  398.     public static final int timeSentSec = 60;
  399.  
  400.     public static void main(String[] args) throws Exception {
  401.         final StreamExecutionEnvironment env = StreamExecutionEnvironment
  402.                 .getExecutionEnvironment()
  403.                 ;
  404.  
  405.         DataStream<PosicionEvent> sensadoInput0 = env.addSource(new MiMQTTSource())
  406.                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<PosicionEvent>() {
  407.                     @Override
  408.                     public long extractAscendingTimestamp(PosicionEvent element) {
  409.                         return StringDatetoLong(element.getDate());
  410.                     }
  411.  
  412.                 })
  413.                 .keyBy(new KeySelector<PosicionEvent, String>() {
  414.                     @Override
  415.                     public String getKey(PosicionEvent posicionEvent) throws Exception {
  416.                         return posicionEvent.getRelacion();
  417.                     }
  418.                 })
  419.                 ;
  420.  
  421.         Pattern<PosicionEvent,?> alarmaClose = Pattern.<PosicionEvent>
  422.                 begin("FirstSensadoEvent")
  423.                 .followedBy("SecondSensadoEvent").where(
  424.                         new IterativeCondition<PosicionEvent>(){
  425.                             @Override
  426.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  427.                                 String typePreviuos="";
  428.                                 for (PosicionEvent mposicionevent : context.getEventsForPattern("FirstSensadoEvent")){
  429.                                     typePreviuos= mposicionevent.getType();
  430.                                     if((typePreviuos.equals("Victima") && posicionEvent.getType().equals("Agresor"))
  431.                                             || (typePreviuos.equals("Agresor") && posicionEvent.getType().equals("Victima"))){
  432.                                         if(Utils.areClose(mposicionevent.getLatitude(),mposicionevent.getLongitude(),
  433.                                                 posicionEvent.getLatitude(),posicionEvent.getLongitude())){
  434.                                             return true;
  435.                                         }
  436.                                     }
  437.                                     return false;
  438.                                 }
  439.                                 return false;
  440.                             }
  441.                         }
  442.                 ).within(Time.seconds(timeSentSec*3/2));
  443.  
  444.         PatternStream<PosicionEvent> patternStreamClose = CEP.pattern(sensadoInput0, alarmaClose);
  445.         DataStream<Alarma> alertsClose = patternStreamClose.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  446.             @Override
  447.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  448.                 PosicionEvent posicionEvent = map.get("FirstSensadoEvent").get(0);
  449.                 PosicionEvent posicionEvent1 = map.get("SecondSensadoEvent").get(0);
  450.                 //addcounter();
  451.                 if(posicionEvent.getType().equals("Victima"))
  452.                     return new Alarma(posicionEvent,posicionEvent1);
  453.                 else return new Alarma(posicionEvent1,posicionEvent);
  454.             }
  455.         });
  456. /*
  457.         Pattern<PosicionEvent,?> alarmaPrevencion = Pattern.<PosicionEvent>
  458.                 begin("begin").times(3)//.allowCombinations()//.consecutive()
  459.                 .followedBy("followed").where(
  460.                         new IterativeCondition <PosicionEvent>(){
  461.                             @Override
  462.                             public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  463.                                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  464.                                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  465.                                 for (PosicionEvent posicionEvent1 : context.getEventsForPattern("begin")) {
  466.                                     if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  467.                                     else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  468.                                 }
  469.                                 if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  470.                                 else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  471.  
  472.                                 if(victimaList.size()==2 && agresorList.size()==2) {
  473.                                     if (Utils.DetectandPrevent(
  474.                                             victimaList.get(0).getLatitude(), victimaList.get(0).getLongitude(),
  475.                                             victimaList.get(1).getLatitude(), victimaList.get(1).getLongitude(),
  476.                                             agresorList.get(0).getLatitude(), agresorList.get(0).getLongitude(),
  477.                                             agresorList.get(1).getLatitude(), agresorList.get(1).getLongitude())) {
  478.                                         return true;
  479.                                     }
  480.                                 }
  481.                                 return false;
  482.                             }
  483.                         }).within(Time.seconds(timeSentSec*5/2));
  484.  
  485.         PatternStream<PosicionEvent> patternStreamAlarm = CEP.pattern(sensadoInput0, alarmaPrevencion);
  486.         DataStream<Alarma> alertaAlarma = patternStreamAlarm.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  487.             @Override
  488.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  489.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  490.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  491.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("begin").listIterator())) {
  492.                     if (positionEvent instanceof Agresor) agresorList.add(positionEvent);
  493.                     else if (positionEvent instanceof Victima) victimaList.add(positionEvent);
  494.                 }
  495.                 PosicionEvent posicionEvent = map.get("followed").get(0);
  496.                 if(posicionEvent instanceof Agresor) agresorList.add(posicionEvent);
  497.                 else if(posicionEvent instanceof Victima) victimaList.add(posicionEvent);
  498.                 return new Alarma(victimaList,agresorList);
  499.             }
  500.         });
  501.  
  502.  
  503.         Pattern<PosicionEvent,?> alarma3 = Pattern.<PosicionEvent>
  504. //                begin("PrimerosSensados",AfterMatchSkipStrategy.skipPastLastEvent())
  505.                 begin("PrimerosSensados",AfterMatchSkipStrategy.skipToLast("PrimerosSensados"))
  506.                 .times(7).consecutive()//
  507.                 .next("SegundoSensado").where(new IterativeCondition<PosicionEvent>() {
  508.                     @Override
  509.                     public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
  510.                         ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  511.                         ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  512.                         for (PosicionEvent posicionEvent1 : context.getEventsForPattern("PrimerosSensados")) {
  513.                             if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
  514.                             else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
  515.                         }
  516.                         if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
  517.                         else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
  518.  
  519.                         if(victimaList.size()>1 && agresorList.size()>1){
  520.                             int sizeVictim = victimaList.size();
  521.                             int sizeAgresor = agresorList.size();
  522.                             for(int i=sizeAgresor-2;i>-1;i--)
  523.                                 if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  524.                                         agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  525.                                     return true;
  526.                                 }
  527.                             for(int i=sizeVictim-2;i>-1;i--)
  528.                                 if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  529.                                         agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  530.                                     return true;
  531.                                 }
  532.                         }
  533.                         return false;
  534.                     }
  535.                 })
  536.                 .within(Time.seconds(timeSentSec*7/2));
  537.  
  538.         PatternStream<PosicionEvent> patternStreamAlarm3 = CEP.pattern(sensadoInput0, alarma3);
  539.         DataStream<Alarma> alertaAlarma3 = patternStreamAlarm3.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
  540.             @Override
  541.             public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
  542.                 ArrayList<PosicionEvent> victimaList = new ArrayList<>();
  543.                 ArrayList<PosicionEvent> agresorList = new ArrayList<>();
  544.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("PrimerosSensados").listIterator()))
  545.                     if(positionEvent.getType().equals("Agresor")) agresorList.add(positionEvent);
  546.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  547.                 for (PosicionEvent positionEvent : Lists.newArrayList(map.get("SegundoSensado").listIterator()))
  548.                     if(positionEvent instanceof Agresor) agresorList.add(positionEvent);
  549.                     else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
  550.  
  551.                 int sizeVictim = victimaList.size();
  552.                 int sizeAgresor = agresorList.size();
  553.                 for(int i=sizeAgresor-2;i>-1;i--)
  554.                     if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
  555.                             agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
  556.                         return new Alarma(victimaList.get(sizeVictim-1),agresorList.get(sizeAgresor-1),agresorList.get(i),sizeVictim+"_"+sizeAgresor);
  557.                     }
  558.                 for(int i=sizeVictim-2;i>-1;i--)
  559.                     if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
  560.                             agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
  561.                         return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1),victimaList.get(i),sizeVictim+"_"+sizeAgresor);
  562.                     }
  563.                 System.out.println("ERROR NO HAY SEGUIMIENTO...");
  564.  
  565.                 return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1));
  566.             }
  567.         });
  568. */
  569.         env.execute("GioCEP for Savia - Smartcity ");
  570.  
  571.     }
  572.  
  573.     public static void printArraySEs(ArrayList<PosicionEvent> victimaList,ArrayList<PosicionEvent> agresorList, int seg, int v,int a){
  574.         String ele="";
  575.         for(int i=0;i<victimaList.size();i++)
  576.             ele +=victimaList.get(i).getDate()+" , ";
  577.         System.out.println("Vic"+seg+":"+victimaList.size());
  578.         ele="";
  579.         for(int i=0;i<agresorList.size();i++)
  580.             ele +=agresorList.get(i).getDate()+" , ";
  581.         System.out.println("Agre"+seg+":"+agresorList.size());
  582.         System.out.println(victimaList.get(v).getDate()+"_"+agresorList.get(a).getDate());
  583.     }
  584.  
  585.     public static final class MisElementos implements MapFunction<String,PosicionEvent> {
  586.         @Override
  587.         public PosicionEvent map(String contentAsString) throws Exception {
  588.             try {
  589.                 JSONObject jsonObject = new JSONObject(contentAsString);
  590.                 String type =jsonObject.getString("type");
  591.                 if (type.equals("Agresor"))             return new Agresor(jsonObject);
  592.                 else if(type.equals("Victima"))       return new Victima(jsonObject);
  593.  
  594.                 else return new PosicionEvent();
  595.             } catch (JSONException e) {
  596.                 System.out.println("No es JSON:"+contentAsString+"_"+e.toString() );
  597.                 //  +"counterAlarm:"+counter+"_counterMQTT:"+counterMQTT+"_counterSocket:");
  598.                 return new PosicionEvent();
  599.             }
  600.         }
  601.     }
  602.  
  603.     public static class MiMQTTSource implements SourceFunction<PosicionEvent> {
  604.         private String user = "CTIC-SMARTCITY";
  605.         private String password = "YTICTRAMS-CITC";
  606.         private volatile boolean isRunning = true;
  607.         @Override
  608.         public void run(SourceContext<PosicionEvent> ctx) throws Exception {
  609.             MqttClient client;
  610.             try {
  611.                 client = new MqttClient("tcp://localhost", MqttClient.generateClientId());
  612.                 MqttConnectOptions options = new MqttConnectOptions();
  613.                 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
  614.                 options.setUserName(user);
  615.                 options.setPassword(password.toCharArray());
  616.                 options.setCleanSession(false);
  617.                 options.setAutomaticReconnect(true);
  618.                 client.setCallback(new MqttCallback() {
  619.                     @Override
  620.                     public void connectionLost(Throwable throwable) {
  621.                         System.out.println("Desconectado MQTT del Socket! "+new SimpleDateFormat("HH:mm:ss").format(new Date()));
  622.                     }
  623.                     @Override
  624.                     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  625.                         System.out.println("topic: "+topic);
  626.                         String re_type[] = topic.split("/");
  627.                         try {
  628.                             JSONObject jsonObject = new JSONObject(new String(mqttMessage.getPayload()));
  629.                             if (re_type[1].equals("Agresor")){
  630.                                 ctx.collect(new Agresor(jsonObject,re_type[0],re_type[1]));
  631.                             }
  632.                             else if (re_type[1].equals("Victima")){
  633.                                 ctx.collect(new Victima(jsonObject,re_type[0],re_type[1]));
  634.                             }
  635.                         }catch (JSONException e) {
  636.                             System.out.println("No es JSON:"+new String(mqttMessage.getPayload())+"_"+e.toString() );
  637.                             //+"counterSegu:"+ParejasSeguimiento+"_counterMQTT:"+counterMQTT);
  638.                         }
  639.                     }
  640.  
  641.                     @Override
  642.                     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  643.  
  644.                     }
  645.                 });
  646.                 client.connect(options);
  647.                 client.subscribe("+/+");
  648.             } catch (MqttException e) {
  649.                 System.out.println("Expcecion hilo "+e.toString());
  650.             }
  651.             while (!Thread.currentThread().isInterrupted() && isRunning) {
  652.                 try {
  653.                     sleep(Integer.MAX_VALUE);
  654.                 }catch (InterruptedException ex) {
  655.                     Thread.currentThread().interrupt();
  656.                 }
  657.             }
  658.         }
  659.  
  660.         @Override
  661.         public void cancel() {
  662.             isRunning = false;
  663.         }
  664.     }
  665.  
  666.     public static long StringDatetoLong(String mdate){
  667.         Date date=new Date();
  668.         try {                                 //2018-04-13 18:28:43
  669.             date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(mdate);
  670.         } catch (ParseException e) {
  671.             System.out.println("Error parsing DATE "+e.toString());
  672.             //e.printStackTrace();
  673.         }
  674.         Timestamp timestamp = new Timestamp(date.getTime());
  675.         return timestamp.getTime();
  676.     }
  677. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement