Advertisement
Guest User

Untitled

a guest
Jul 23rd, 2018
176
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.85 KB | None | 0 0
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4.  
  5. <groupId>flink-satrt2</groupId>
  6. <artifactId>flink-satrt2</artifactId>
  7. <version>1.0</version>
  8. <packaging>jar</packaging>
  9.  
  10. <name>Flink Quickstart Job</name>
  11. <url>http://www.myorganization.org</url>
  12.  
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. <flink.version>1.5.0</flink.version>
  16. <java.version>1.8</java.version>
  17. <scala.binary.version>2.11</scala.binary.version>
  18. <maven.compiler.source>${java.version}</maven.compiler.source>
  19. <maven.compiler.target>${java.version}</maven.compiler.target>
  20. </properties>
  21.  
  22. <repositories>
  23. <repository>
  24. <id>apache.snapshots</id>
  25. <name>Apache Development Snapshot Repository</name>
  26. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  27. <releases>
  28. <enabled>false</enabled>
  29. </releases>
  30. <snapshots>
  31. <enabled>true</enabled>
  32. </snapshots>
  33. </repository>
  34. </repositories>
  35.  
  36. <dependencies>
  37. <!-- Mqtt dependencies and other additional dependencies -->
  38. <dependency>
  39. <groupId>org.eclipse.paho</groupId>
  40. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  41. <version>1.1.1</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.googlecode.json-simple</groupId>
  45. <artifactId>json-simple</artifactId>
  46. <version>1.1</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>com.google.code.gson</groupId>
  50. <artifactId>gson</artifactId>
  51. <version>2.8.1</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.mongodb</groupId>
  55. <artifactId>mongo-java-driver</artifactId>
  56. <version>3.8.0</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.eagle</groupId>
  60. <artifactId>eagle-jpm-aggregation</artifactId>
  61. <version>0.5.0-incubating-SNAPSHOT</version>
  62. <scope>compile</scope>
  63. </dependency>
  64.  
  65. <!-- Apache Flink dependencies -->
  66. <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
  67. <dependency>
  68. <groupId>org.apache.flink</groupId>
  69. <artifactId>flink-java</artifactId>
  70. <version>${flink.version}</version>
  71. <scope>provided</scope>
  72. </dependency>
  73. <dependency>
  74. <groupId>org.apache.flink</groupId>
  75. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  76. <version>${flink.version}</version>
  77. <scope>provided</scope>
  78. </dependency>
  79.  
  80. <!-- Add connector dependencies here. They must be in the default scope (compile). -->
  81.  
  82. <!-- Example:
  83.  
  84. <dependency>
  85. <groupId>org.apache.flink</groupId>
  86. <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
  87. <version>${flink.version}</version>
  88. </dependency>
  89. -->
  90.  
  91. <!-- Add logging framework, to produce console output when running in the IDE. -->
  92. <!-- These dependencies are excluded from the application JAR by default. -->
  93. <dependency>
  94. <groupId>org.slf4j</groupId>
  95. <artifactId>slf4j-log4j12</artifactId>
  96. <version>1.7.7</version>
  97. <scope>runtime</scope>
  98. </dependency>
  99. <dependency>
  100. <groupId>log4j</groupId>
  101. <artifactId>log4j</artifactId>
  102. <version>1.2.17</version>
  103. <scope>runtime</scope>
  104. </dependency>
  105. </dependencies>
  106.  
  107. <build>
  108. <plugins>
  109.  
  110. <!-- Java Compiler -->
  111. <plugin>
  112. <groupId>org.apache.maven.plugins</groupId>
  113. <artifactId>maven-compiler-plugin</artifactId>
  114. <version>3.1</version>
  115. <configuration>
  116. <source>${java.version}</source>
  117. <target>${java.version}</target>
  118. </configuration>
  119. </plugin>
  120.  
  121. <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
  122. <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
  123. <plugin>
  124. <groupId>org.apache.maven.plugins</groupId>
  125. <artifactId>maven-shade-plugin</artifactId>
  126. <version>3.0.0</version>
  127. <executions>
  128. <!-- Run shade goal on package phase -->
  129. <execution>
  130. <phase>package</phase>
  131. <goals>
  132. <goal>shade</goal>
  133. </goals>
  134. <configuration>
  135. <artifactSet>
  136. <excludes>
  137. <exclude>org.apache.flink:force-shading</exclude>
  138. <exclude>com.google.code.findbugs:jsr305</exclude>
  139. <exclude>org.slf4j:*</exclude>
  140. <exclude>log4j:*</exclude>
  141. </excludes>
  142. </artifactSet>
  143. <filters>
  144. <filter>
  145. <!-- Do not copy the signatures in the META-INF folder.
  146. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  147. <artifact>*:*</artifact>
  148. <excludes>
  149. <exclude>META-INF/*.SF</exclude>
  150. <exclude>META-INF/*.DSA</exclude>
  151. <exclude>META-INF/*.RSA</exclude>
  152. </excludes>
  153. </filter>
  154. </filters>
  155. <transformers>
  156. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  157. <mainClass>MetrixAnalysis</mainClass>
  158. </transformer>
  159. </transformers>
  160. </configuration>
  161. </execution>
  162. </executions>
  163. </plugin>
  164. </plugins>
  165.  
  166. <pluginManagement>
  167. <plugins>
  168.  
  169. <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
  170. <!--
  171. <plugin>
  172. <artifactId>maven-compiler-plugin</artifactId>
  173. <configuration>
  174. <source>${java.version}</source>
  175. <target>${java.version}</target>
  176. <compilerId>jdt</compilerId>
  177. </configuration>
  178. <dependencies>
  179. <dependency>
  180. <groupId>org.eclipse.tycho</groupId>
  181. <artifactId>tycho-compiler-jdt</artifactId>
  182. <version>0.21.0</version>
  183. </dependency>
  184. </dependencies>
  185. </plugin>
  186. -->
  187.  
  188. <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
  189. <plugin>
  190. <groupId>org.eclipse.m2e</groupId>
  191. <artifactId>lifecycle-mapping</artifactId>
  192. <version>1.0.0</version>
  193. <configuration>
  194. <lifecycleMappingMetadata>
  195. <pluginExecutions>
  196. <pluginExecution>
  197. <pluginExecutionFilter>
  198. <groupId>org.apache.maven.plugins</groupId>
  199. <artifactId>maven-shade-plugin</artifactId>
  200. <versionRange>[3.0.0,)</versionRange>
  201. <goals>
  202. <goal>shade</goal>
  203. </goals>
  204. </pluginExecutionFilter>
  205. <action>
  206. <ignore/>
  207. </action>
  208. </pluginExecution>
  209. <pluginExecution>
  210. <pluginExecutionFilter>
  211. <groupId>org.apache.maven.plugins</groupId>
  212. <artifactId>maven-compiler-plugin</artifactId>
  213. <versionRange>[3.1,)</versionRange>
  214. <goals>
  215. <goal>testCompile</goal>
  216. <goal>compile</goal>
  217. </goals>
  218. </pluginExecutionFilter>
  219. <action>
  220. <ignore/>
  221. </action>
  222. </pluginExecution>
  223. </pluginExecutions>
  224. </lifecycleMappingMetadata>
  225. </configuration>
  226. </plugin>
  227. </plugins>
  228. </pluginManagement>
  229. </build>
  230.  
  231. <!-- This profile helps to make things run out of the box in IntelliJ -->
  232. <!-- Its adds Flink's core classes to the runtime class path. -->
  233. <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
  234. <profiles>
  235. <profile>
  236. <id>add-dependencies-for-IDEA</id>
  237.  
  238. <activation>
  239. <property>
  240. <name>idea.version</name>
  241. </property>
  242. </activation>
  243.  
  244. <dependencies>
  245. <dependency>
  246. <groupId>org.apache.flink</groupId>
  247. <artifactId>flink-java</artifactId>
  248. <version>${flink.version}</version>
  249. <scope>compile</scope>
  250. </dependency>
  251. <dependency>
  252. <groupId>org.apache.flink</groupId>
  253. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  254. <version>${flink.version}</version>
  255. <scope>compile</scope>
  256. </dependency>
  257. </dependencies>
  258. </profile>
  259. </profiles>
  260.  
  261. </project>
  262.  
  263. public class MetrixAnalysis
  264. {
  265. private static int falseNegativeCount = 0,falsePositiveCount = 0,faceDetectedCount =0,successCount = 0;
  266. public static void main(String args[]) throws Exception
  267. {
  268. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  269.  
  270. String clientId = MqttConstant.CLIENT_ID;
  271. String serverUrl =MqttConstant.URL;
  272.  
  273. String password =MqttConstant.PASSWORD;
  274. String USERNAME =MqttConstant.USERNAME;
  275.  
  276. MqttConnectOptions connectOptions = new MqttConnectOptions();
  277. connectOptions.setUserName(USERNAME);
  278. connectOptions.setPassword(password.toCharArray());
  279.  
  280. System.out.println(connectOptions.getUserName()+" "+connectOptions.getPassword());
  281.  
  282. MqttClient client = new MqttClient(serverUrl, clientId);
  283. client.connect(connectOptions);
  284.  
  285. System.out.println("SUBSCRIBED_TOPICS:-");
  286. for (Map.Entry<String,String> topic:MqttConstant.TOPICS.entrySet())
  287. {
  288. System.out.println(topic.getValue());
  289. client.subscribe(topic.getValue()) ;
  290. }
  291. System.out.println("n"+"PUBLISHING_TOPIC:-"+MqttConstant.TOPICS.get("PUBLISHING_TOPIC")+"n");
  292.  
  293. DataStream<String> dataStream= env.readTextFile("/home/adeeshwar/flinkProjects/flink-start3/Abc.txt");
  294. client.setTimeToWait(5000);
  295.  
  296. client.setCallback(new MqttCallback()
  297. {
  298. @Override
  299. public void connectionLost(Throwable throwable)
  300. { System.out.println("Connnection Lost!!!!!!!!!!!!!!!!!!!!");}
  301.  
  302. @Override
  303. public void messageArrived(String s, MqttMessage mqttMessage) throws Exception
  304. {
  305. //count=0;
  306. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MMM-dd");
  307. Date date = new Date();
  308.  
  309. //mongo db connection with database and collection
  310. System.out.println("nMESSAGE_ARRIVED:-n"+mqttMessage+"n");
  311. MongoClient mongoClient = new MongoClient("localhost", 27017);
  312. DB database = mongoClient.getDB("Faces");
  313. DBCollection fetchCollection = database.getCollection("DeviceAnalysis");
  314.  
  315. //getting stream coming from mqtt.fx in payload
  316. String payload=new String(mqttMessage.getPayload());
  317. System.out.println("nPAYLOAD:-n"+payload+"n");
  318.  
  319. try
  320. {
  321. //creating json object for stream
  322. JSONObject obj = new JSONObject(payload);
  323. System.out.println("JSON_OBJECT:-n" + obj + "n");
  324.  
  325. //fetching fields from jsonobject
  326. String device_id = obj.getString("device_id");
  327. Long timestamp = obj.getLong("timestamp");
  328. String version = obj.getString("version");
  329. String model;
  330. Date resultdate = new Date(timestamp);
  331. String streamDate=sdf.format(resultdate);
  332.  
  333. //creating BasicDBObject for document
  334. BasicDBObject facialStream= new BasicDBObject();
  335.  
  336. //check model value and insert in model variable
  337. if(obj.has("model"))
  338. model = obj.getString("model");
  339. else
  340. model="";
  341.  
  342. //for type field
  343. String streamType=null ;//= obj.getString("type");
  344. // String type1 = null;
  345.  
  346. if(obj.has("face"))
  347. {
  348. if(obj.getString("face").equals(""))
  349. {
  350. streamType="snapshot";
  351. }
  352. else if(obj.getString("face").equals("HexCodedString"))
  353. {
  354. streamType="faceDetected";
  355. }
  356. }
  357. else
  358. {
  359. streamType=obj.getString("type");
  360. }
  361.  
  362. //for and query on the basis of device_id,version,model,date
  363. BasicDBObject andQuery=new BasicDBObject();
  364. List<BasicDBObject> obj1= new ArrayList<BasicDBObject>();
  365. obj1.add(new BasicDBObject("device_id",device_id));
  366. obj1.add(new BasicDBObject("version",version));
  367. obj1.add(new BasicDBObject("date",streamDate));
  368. obj1.add(new BasicDBObject("model",model));
  369. // obj1.add(new BasicDBObject("type",streamType));
  370.  
  371. andQuery.put("$and",obj1);
  372.  
  373. //count of fetched document
  374. Cursor fetchRecords= fetchCollection.find(andQuery);
  375. int recordsCount = fetchCollection.find(andQuery).count();
  376. System.out.print("FetchedRecordsCount: "+recordsCount+"n");
  377.  
  378. Boolean flag =false;
  379. if(recordsCount>0)
  380. {
  381. System.out.println("When Fetched Records>0n");
  382. falseNegativeCount = 0;falsePositiveCount = 0;faceDetectedCount =0;successCount = 0; //,snapshotsCount = 0;
  383.  
  384. while (fetchRecords.hasNext())
  385. {
  386. BasicDBObject fetchedRecord = (BasicDBObject) fetchRecords.next();
  387. String recordType=fetchedRecord.getString("type");
  388. int recordCount=fetchedRecord.getInt("count");
  389.  
  390. if (recordType.equals(streamType))
  391. {
  392. flag=true;
  393. System.out.println("Fetched Records type matches with Input Facial Streamn");
  394. BasicDBObject updateQuerry1=new BasicDBObject().append("$set",new BasicDBObject().append("count",++recordCount).append("timestamp",timestamp));
  395. fetchCollection.update(fetchedRecord,updateQuerry1);
  396. }
  397. calculatingSuccessCount(recordType,recordCount);
  398.  
  399. }
  400.  
  401. if(!flag)
  402. {
  403. System.out.println("Fetched Records type does not match with Input Facial Streamn");
  404. saveNewRecord(facialStream,device_id,version,model,streamType,timestamp,streamDate,fetchCollection);
  405. calculatingSuccessCount(streamType,1);
  406. }
  407. successCount = faceDetectedCount - falsePositiveCount - falseNegativeCount;
  408.  
  409. fetchRecords= fetchCollection.find(andQuery);
  410. while (fetchRecords.hasNext())
  411. {
  412. BasicDBObject fetchedRecord = (BasicDBObject) fetchRecords.next();
  413. BasicDBObject updateQuerry2=new BasicDBObject().append("$set",new BasicDBObject().append("successCount",successCount));
  414. fetchCollection.update(fetchedRecord,updateQuerry2);
  415. }
  416. }
  417. else
  418. {
  419. System.out.println("When Fetched Records=0n");
  420. saveNewRecord(facialStream,device_id,version,model,streamType,timestamp,streamDate,fetchCollection);
  421. }
  422.  
  423. }
  424. catch (Exception e)
  425. {
  426. e.printStackTrace();
  427. }
  428. }
  429.  
  430. @Override
  431. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)
  432. { }
  433. });
  434. dataStream.print();
  435. env.execute();
  436. }
  437.  
  438. Starting execution of program
  439. orahi [C@41fecb8b
  440. SUBSCRIBED_TOPICS:-
  441. /flink/mqtt
  442. /mqtt/flink/camera
  443. /mqtt/flink
  444.  
  445. PUBLISHING_TOPIC:-/flink/mqtt
  446.  
  447. Program execution finished
  448. Job with JobID c9afe364c1f3c10232e877bc4e0dcf89 has finished.
  449. Job Runtime: 3942 ms
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement