Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ctic.smartcity.savia;
- import ctic.smartcity.savia.Data.Agresor;
- import ctic.smartcity.savia.Data.Alarma;
- import ctic.smartcity.savia.Data.Comparador;
- import ctic.smartcity.savia.Data.Victima;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.Partitioner;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.typeinfo.TypeHint;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternSelectFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.IterativeCondition;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
- import org.apache.flink.streaming.api.TimerService;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
- import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
- import org.eclipse.paho.client.mqttv3.*;
- import org.json.JSONException;
- import org.json.JSONObject;
- import javax.annotation.Nullable;
- import java.net.ServerSocket;
- import java.sql.Timestamp;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import static java.lang.Thread.sleep;
- public class CEPMonitor {
- public static final int timeSentSec = 60;
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment
- .getExecutionEnvironment()
- ;
- DataStream<PosicionEvent> sensadoInput0 = env.addSource(new MiMQTTSource())
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<PosicionEvent>() {
- @Override
- public long extractAscendingTimestamp(PosicionEvent element) {
- return StringDatetoLong(element.getDate());
- }
- })
- .keyBy(new KeySelector<PosicionEvent, String>() {
- @Override
- public String getKey(PosicionEvent posicionEvent) throws Exception {
- return posicionEvent.getRelacion();
- }
- })
- ;
- Pattern<PosicionEvent,?> alarmaClose = Pattern.<PosicionEvent>
- begin("FirstSensadoEvent")
- .followedBy("SecondSensadoEvent").where(
- new IterativeCondition<PosicionEvent>(){
- @Override
- public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
- String typePreviuos="";
- for (PosicionEvent mposicionevent : context.getEventsForPattern("FirstSensadoEvent")){
- typePreviuos= mposicionevent.getType();
- if((typePreviuos.equals("Victima") && posicionEvent.getType().equals("Agresor"))
- || (typePreviuos.equals("Agresor") && posicionEvent.getType().equals("Victima"))){
- if(Utils.areClose(mposicionevent.getLatitude(),mposicionevent.getLongitude(),
- posicionEvent.getLatitude(),posicionEvent.getLongitude())){
- return true;
- }
- }
- return false;
- }
- return false;
- }
- }
- ).within(Time.seconds(timeSentSec*3/2));
- PatternStream<PosicionEvent> patternStreamClose = CEP.pattern(sensadoInput0, alarmaClose);
- DataStream<Alarma> alertsClose = patternStreamClose.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
- @Override
- public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
- PosicionEvent posicionEvent = map.get("FirstSensadoEvent").get(0);
- PosicionEvent posicionEvent1 = map.get("SecondSensadoEvent").get(0);
- //addcounter();
- if(posicionEvent.getType().equals("Victima"))
- return new Alarma(posicionEvent,posicionEvent1);
- else return new Alarma(posicionEvent1,posicionEvent);
- }
- });
- /*
- Pattern<PosicionEvent,?> alarmaPrevencion = Pattern.<PosicionEvent>
- begin("begin").times(3)//.allowCombinations()//.consecutive()
- .followedBy("followed").where(
- new IterativeCondition <PosicionEvent>(){
- @Override
- public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
- ArrayList<PosicionEvent> victimaList = new ArrayList<>();
- ArrayList<PosicionEvent> agresorList = new ArrayList<>();
- for (PosicionEvent posicionEvent1 : context.getEventsForPattern("begin")) {
- if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
- else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
- }
- if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
- else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
- if(victimaList.size()==2 && agresorList.size()==2) {
- if (Utils.DetectandPrevent(
- victimaList.get(0).getLatitude(), victimaList.get(0).getLongitude(),
- victimaList.get(1).getLatitude(), victimaList.get(1).getLongitude(),
- agresorList.get(0).getLatitude(), agresorList.get(0).getLongitude(),
- agresorList.get(1).getLatitude(), agresorList.get(1).getLongitude())) {
- return true;
- }
- }
- return false;
- }
- }).within(Time.seconds(timeSentSec*5/2));
- PatternStream<PosicionEvent> patternStreamAlarm = CEP.pattern(sensadoInput0, alarmaPrevencion);
- DataStream<Alarma> alertaAlarma = patternStreamAlarm.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
- @Override
- public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
- ArrayList<PosicionEvent> victimaList = new ArrayList<>();
- ArrayList<PosicionEvent> agresorList = new ArrayList<>();
- for (PosicionEvent positionEvent : Lists.newArrayList(map.get("begin").listIterator())) {
- if (positionEvent instanceof Agresor) agresorList.add(positionEvent);
- else if (positionEvent instanceof Victima) victimaList.add(positionEvent);
- }
- PosicionEvent posicionEvent = map.get("followed").get(0);
- if(posicionEvent instanceof Agresor) agresorList.add(posicionEvent);
- else if(posicionEvent instanceof Victima) victimaList.add(posicionEvent);
- return new Alarma(victimaList,agresorList);
- }
- });
- Pattern<PosicionEvent,?> alarma3 = Pattern.<PosicionEvent>
- // begin("PrimerosSensados",AfterMatchSkipStrategy.skipPastLastEvent())
- begin("PrimerosSensados",AfterMatchSkipStrategy.skipToLast("PrimerosSensados"))
- .times(7).consecutive()//
- .next("SegundoSensado").where(new IterativeCondition<PosicionEvent>() {
- @Override
- public boolean filter(PosicionEvent posicionEvent, Context<PosicionEvent> context) throws Exception {
- ArrayList<PosicionEvent> victimaList = new ArrayList<>();
- ArrayList<PosicionEvent> agresorList = new ArrayList<>();
- for (PosicionEvent posicionEvent1 : context.getEventsForPattern("PrimerosSensados")) {
- if(posicionEvent1.getType().equals("Agresor")) agresorList.add(posicionEvent1);
- else if(posicionEvent1.getType().equals("Victima")) victimaList.add(posicionEvent1);
- }
- if(posicionEvent.getType().equals("Agresor")) agresorList.add(posicionEvent);
- else if(posicionEvent.getType().equals("Victima")) victimaList.add(posicionEvent);
- if(victimaList.size()>1 && agresorList.size()>1){
- int sizeVictim = victimaList.size();
- int sizeAgresor = agresorList.size();
- for(int i=sizeAgresor-2;i>-1;i--)
- if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
- agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
- return true;
- }
- for(int i=sizeVictim-2;i>-1;i--)
- if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
- agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
- return true;
- }
- }
- return false;
- }
- })
- .within(Time.seconds(timeSentSec*7/2));
- PatternStream<PosicionEvent> patternStreamAlarm3 = CEP.pattern(sensadoInput0, alarma3);
- DataStream<Alarma> alertaAlarma3 = patternStreamAlarm3.select(new PatternSelectFunction<PosicionEvent, Alarma>() {
- @Override
- public Alarma select(Map<String, List<PosicionEvent>> map) throws Exception {
- ArrayList<PosicionEvent> victimaList = new ArrayList<>();
- ArrayList<PosicionEvent> agresorList = new ArrayList<>();
- for (PosicionEvent positionEvent : Lists.newArrayList(map.get("PrimerosSensados").listIterator()))
- if(positionEvent.getType().equals("Agresor")) agresorList.add(positionEvent);
- else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
- for (PosicionEvent positionEvent : Lists.newArrayList(map.get("SegundoSensado").listIterator()))
- if(positionEvent instanceof Agresor) agresorList.add(positionEvent);
- else if(positionEvent instanceof Victima) victimaList.add(positionEvent);
- int sizeVictim = victimaList.size();
- int sizeAgresor = agresorList.size();
- for(int i=sizeAgresor-2;i>-1;i--)
- if(Utils.areClose(victimaList.get(sizeVictim-1).getLatitude(), victimaList.get(sizeVictim-1).getLongitude(),
- agresorList.get(i).getLatitude(), agresorList.get(i).getLongitude())){
- return new Alarma(victimaList.get(sizeVictim-1),agresorList.get(sizeAgresor-1),agresorList.get(i),sizeVictim+"_"+sizeAgresor);
- }
- for(int i=sizeVictim-2;i>-1;i--)
- if(Utils.areClose(victimaList.get(i).getLatitude(), victimaList.get(i).getLongitude(),
- agresorList.get(sizeAgresor-1).getLatitude(), agresorList.get(sizeAgresor-1).getLongitude())){
- return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1),victimaList.get(i),sizeVictim+"_"+sizeAgresor);
- }
- System.out.println("ERROR NO HAY SEGUIMIENTO...");
- return new Alarma(victimaList.get(sizeVictim-1), agresorList.get(sizeAgresor-1));
- }
- });
- */
- env.execute("GioCEP for Savia - Smartcity ");
- }
- public static void printArraySEs(ArrayList<PosicionEvent> victimaList,ArrayList<PosicionEvent> agresorList, int seg, int v,int a){
- String ele="";
- for(int i=0;i<victimaList.size();i++)
- ele +=victimaList.get(i).getDate()+" , ";
- System.out.println("Vic"+seg+":"+victimaList.size());
- ele="";
- for(int i=0;i<agresorList.size();i++)
- ele +=agresorList.get(i).getDate()+" , ";
- System.out.println("Agre"+seg+":"+agresorList.size());
- System.out.println(victimaList.get(v).getDate()+"_"+agresorList.get(a).getDate());
- }
- public static final class MisElementos implements MapFunction<String,PosicionEvent> {
- @Override
- public PosicionEvent map(String contentAsString) throws Exception {
- try {
- JSONObject jsonObject = new JSONObject(contentAsString);
- String type =jsonObject.getString("type");
- if (type.equals("Agresor")) return new Agresor(jsonObject);
- else if(type.equals("Victima")) return new Victima(jsonObject);
- else return new PosicionEvent();
- } catch (JSONException e) {
- System.out.println("No es JSON:"+contentAsString+"_"+e.toString() );
- // +"counterAlarm:"+counter+"_counterMQTT:"+counterMQTT+"_counterSocket:");
- return new PosicionEvent();
- }
- }
- }
- public static class MiMQTTSource implements SourceFunction<PosicionEvent> {
- private String user = "CTIC-SMARTCITY";
- private String password = "YTICTRAMS-CITC";
- private volatile boolean isRunning = true;
- @Override
- public void run(SourceContext<PosicionEvent> ctx) throws Exception {
- MqttClient client;
- try {
- client = new MqttClient("tcp://localhost", MqttClient.generateClientId());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
- options.setUserName(user);
- options.setPassword(password.toCharArray());
- options.setCleanSession(false);
- options.setAutomaticReconnect(true);
- client.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable throwable) {
- System.out.println("Desconectado MQTT del Socket! "+new SimpleDateFormat("HH:mm:ss").format(new Date()));
- }
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- System.out.println("topic: "+topic);
- String re_type[] = topic.split("/");
- try {
- JSONObject jsonObject = new JSONObject(new String(mqttMessage.getPayload()));
- if (re_type[1].equals("Agresor")){
- ctx.collect(new Agresor(jsonObject,re_type[0],re_type[1]));
- }
- else if (re_type[1].equals("Victima")){
- ctx.collect(new Victima(jsonObject,re_type[0],re_type[1]));
- }
- }catch (JSONException e) {
- System.out.println("No es JSON:"+new String(mqttMessage.getPayload())+"_"+e.toString() );
- //+"counterSegu:"+ParejasSeguimiento+"_counterMQTT:"+counterMQTT);
- }
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- }
- });
- client.connect(options);
- client.subscribe("+/+");
- } catch (MqttException e) {
- System.out.println("Expcecion hilo "+e.toString());
- }
- while (!Thread.currentThread().isInterrupted() && isRunning) {
- try {
- sleep(Integer.MAX_VALUE);
- }catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
- public static long StringDatetoLong(String mdate){
- Date date=new Date();
- try { //2018-04-13 18:28:43
- date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(mdate);
- } catch (ParseException e) {
- System.out.println("Error parsing DATE "+e.toString());
- //e.printStackTrace();
- }
- Timestamp timestamp = new Timestamp(date.getTime());
- return timestamp.getTime();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement