Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>flink-satrt2</groupId>
- <artifactId>flink-satrt2</artifactId>
- <version>1.0</version>
- <packaging>jar</packaging>
- <name>Flink Quickstart Job</name>
- <url>http://www.myorganization.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.5.0</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.11</scala.binary.version>
- <maven.compiler.source>${java.version}</maven.compiler.source>
- <maven.compiler.target>${java.version}</maven.compiler.target>
- </properties>
- <repositories>
- <repository>
- <id>apache.snapshots</id>
- <name>Apache Development Snapshot Repository</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
- <dependencies>
- <!-- Mqtt dependencies and other additional dependencies -->
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-jpm-aggregation</artifactId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <!-- Apache Flink dependencies -->
- <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <!-- Add connector dependencies here. They must be in the default scope (compile). -->
- <!-- Example:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- -->
- <!-- Add logging framework, to produce console output when running in the IDE. -->
- <!-- These dependencies are excluded from the application JAR by default. -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.7</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <!-- Java Compiler -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
- <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>org.apache.flink:force-shading</exclude>
- <exclude>com.google.code.findbugs:jsr305</exclude>
- <exclude>org.slf4j:*</exclude>
- <exclude>log4j:*</exclude>
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <!-- Do not copy the signatures in the META-INF folder.
- Otherwise, this might cause SecurityExceptions when using the JAR. -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>MetrixAnalysis</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <pluginManagement>
- <plugins>
- <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
- <!--
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- <compilerId>jdt</compilerId>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.eclipse.tycho</groupId>
- <artifactId>tycho-compiler-jdt</artifactId>
- <version>0.21.0</version>
- </dependency>
- </dependencies>
- </plugin>
- -->
- <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <versionRange>[3.0.0,)</versionRange>
- <goals>
- <goal>shade</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <versionRange>[3.1,)</versionRange>
- <goals>
- <goal>testCompile</goal>
- <goal>compile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- <!-- This profile helps to make things run out of the box in IntelliJ -->
- <!-- Its adds Flink's core classes to the runtime class path. -->
- <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
- <profiles>
- <profile>
- <id>add-dependencies-for-IDEA</id>
- <activation>
- <property>
- <name>idea.version</name>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>compile</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
- </project>
- public class MetrixAnalysis
- {
- private static int falseNegativeCount = 0,falsePositiveCount = 0,faceDetectedCount =0,successCount = 0;
- public static void main(String args[]) throws Exception
- {
- StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
- String clientId = MqttConstant.CLIENT_ID;
- String serverUrl =MqttConstant.URL;
- String password =MqttConstant.PASSWORD;
- String USERNAME =MqttConstant.USERNAME;
- MqttConnectOptions connectOptions = new MqttConnectOptions();
- connectOptions.setUserName(USERNAME);
- connectOptions.setPassword(password.toCharArray());
- System.out.println(connectOptions.getUserName()+" "+connectOptions.getPassword());
- MqttClient client = new MqttClient(serverUrl, clientId);
- client.connect(connectOptions);
- System.out.println("SUBSCRIBED_TOPICS:-");
- for (Map.Entry<String,String> topic:MqttConstant.TOPICS.entrySet())
- {
- System.out.println(topic.getValue());
- client.subscribe(topic.getValue()) ;
- }
- System.out.println("n"+"PUBLISHING_TOPIC:-"+MqttConstant.TOPICS.get("PUBLISHING_TOPIC")+"n");
- DataStream<String> dataStream= env.readTextFile("/home/adeeshwar/flinkProjects/flink-start3/Abc.txt");
- client.setTimeToWait(5000);
- client.setCallback(new MqttCallback()
- {
- @Override
- public void connectionLost(Throwable throwable)
- { System.out.println("Connnection Lost!!!!!!!!!!!!!!!!!!!!");}
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception
- {
- //count=0;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MMM-dd");
- Date date = new Date();
- //mongo db connection with database and collection
- System.out.println("nMESSAGE_ARRIVED:-n"+mqttMessage+"n");
- MongoClient mongoClient = new MongoClient("localhost", 27017);
- DB database = mongoClient.getDB("Faces");
- DBCollection fetchCollection = database.getCollection("DeviceAnalysis");
- //getting stream coming from mqtt.fx in payload
- String payload=new String(mqttMessage.getPayload());
- System.out.println("nPAYLOAD:-n"+payload+"n");
- try
- {
- //creating json object for stream
- JSONObject obj = new JSONObject(payload);
- System.out.println("JSON_OBJECT:-n" + obj + "n");
- //fetching fields from jsonobject
- String device_id = obj.getString("device_id");
- Long timestamp = obj.getLong("timestamp");
- String version = obj.getString("version");
- String model;
- Date resultdate = new Date(timestamp);
- String streamDate=sdf.format(resultdate);
- //creating BasicDBObject for document
- BasicDBObject facialStream= new BasicDBObject();
- //check model value and insert in model variable
- if(obj.has("model"))
- model = obj.getString("model");
- else
- model="";
- //for type field
- String streamType=null ;//= obj.getString("type");
- // String type1 = null;
- if(obj.has("face"))
- {
- if(obj.getString("face").equals(""))
- {
- streamType="snapshot";
- }
- else if(obj.getString("face").equals("HexCodedString"))
- {
- streamType="faceDetected";
- }
- }
- else
- {
- streamType=obj.getString("type");
- }
- //for and query on the basis of device_id,version,model,date
- BasicDBObject andQuery=new BasicDBObject();
- List<BasicDBObject> obj1= new ArrayList<BasicDBObject>();
- obj1.add(new BasicDBObject("device_id",device_id));
- obj1.add(new BasicDBObject("version",version));
- obj1.add(new BasicDBObject("date",streamDate));
- obj1.add(new BasicDBObject("model",model));
- // obj1.add(new BasicDBObject("type",streamType));
- andQuery.put("$and",obj1);
- //count of fetched document
- Cursor fetchRecords= fetchCollection.find(andQuery);
- int recordsCount = fetchCollection.find(andQuery).count();
- System.out.print("FetchedRecordsCount: "+recordsCount+"n");
- Boolean flag =false;
- if(recordsCount>0)
- {
- System.out.println("When Fetched Records>0n");
- falseNegativeCount = 0;falsePositiveCount = 0;faceDetectedCount =0;successCount = 0; //,snapshotsCount = 0;
- while (fetchRecords.hasNext())
- {
- BasicDBObject fetchedRecord = (BasicDBObject) fetchRecords.next();
- String recordType=fetchedRecord.getString("type");
- int recordCount=fetchedRecord.getInt("count");
- if (recordType.equals(streamType))
- {
- flag=true;
- System.out.println("Fetched Records type matches with Input Facial Streamn");
- BasicDBObject updateQuerry1=new BasicDBObject().append("$set",new BasicDBObject().append("count",++recordCount).append("timestamp",timestamp));
- fetchCollection.update(fetchedRecord,updateQuerry1);
- }
- calculatingSuccessCount(recordType,recordCount);
- }
- if(!flag)
- {
- System.out.println("Fetched Records type does not match with Input Facial Streamn");
- saveNewRecord(facialStream,device_id,version,model,streamType,timestamp,streamDate,fetchCollection);
- calculatingSuccessCount(streamType,1);
- }
- successCount = faceDetectedCount - falsePositiveCount - falseNegativeCount;
- fetchRecords= fetchCollection.find(andQuery);
- while (fetchRecords.hasNext())
- {
- BasicDBObject fetchedRecord = (BasicDBObject) fetchRecords.next();
- BasicDBObject updateQuerry2=new BasicDBObject().append("$set",new BasicDBObject().append("successCount",successCount));
- fetchCollection.update(fetchedRecord,updateQuerry2);
- }
- }
- else
- {
- System.out.println("When Fetched Records=0n");
- saveNewRecord(facialStream,device_id,version,model,streamType,timestamp,streamDate,fetchCollection);
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)
- { }
- });
- dataStream.print();
- env.execute();
- }
- Starting execution of program
- orahi [C@41fecb8b
- SUBSCRIBED_TOPICS:-
- /flink/mqtt
- /mqtt/flink/camera
- /mqtt/flink
- PUBLISHING_TOPIC:-/flink/mqtt
- Program execution finished
- Job with JobID c9afe364c1f3c10232e877bc4e0dcf89 has finished.
- Job Runtime: 3942 ms
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement