Advertisement
Guest User

Untitled

a guest
Jan 22nd, 2020
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 12.95 KB | None | 0 0
  1. package Main;
  2.  
  3. import java.net.InetSocketAddress;
  4. import java.sql.*;
  5. import java.util.*;
  6.  
  7. import org.apache.ignite.*;
  8. import org.apache.ignite.cache.CacheMode;
  9. import org.apache.ignite.configuration.AddressResolver;
  10. import org.apache.ignite.configuration.CacheConfiguration;
  11. import org.apache.ignite.configuration.DataStorageConfiguration;
  12. import org.apache.ignite.configuration.IgniteConfiguration;
  13. import org.apache.ignite.failure.NoOpFailureHandler;
  14. import org.apache.ignite.logger.log4j.Log4JLogger;
  15. import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
  16. import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  17. import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
  18. import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
  19. import org.apache.spark.api.java.function.MapFunction;
  20. import org.apache.spark.sql.*;
  21. import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
  22. import org.apache.spark.sql.catalyst.encoders.RowEncoder;
  23. import org.apache.spark.sql.streaming.StreamingQuery;
  24. import org.apache.spark.sql.streaming.StreamingQueryException;
  25. import org.apache.spark.sql.types.DataTypes;
  26. import org.apache.spark.sql.types.StructType;
  27.  
  28. import com.google.gson.*;
  29.  
  30. import Model.*;
  31. import Utils.*;
  32.  
  33. public class Program
  34. {
  35. private static IGsonUtils gsonUtils = new GsonUtils();
  36.  
  37. private static HashMap<String, List<EventInfo>> eventMap = new HashMap<String, List<EventInfo>>();
  38.  
  39. private static Ignite ignite;
  40.  
  41. public static void main(String[] args)
  42. {
  43. System.out.println("App Successfully Started");
  44.  
  45. // TEMP EXAMPLE START
  46.  
  47. List<EventInfo> eventInfosOne = new ArrayList<>();
  48. eventInfosOne.add(new EventInfo("Car", "Name", "Car"));
  49. eventInfosOne.add(new EventInfo("Address", "Name", "Company.Address"));
  50.  
  51. eventMap.put("test_event_one", eventInfosOne);
  52.  
  53. List<EventInfo> eventInfosTwo = new ArrayList<>();
  54. eventInfosTwo.add(new EventInfo("Car", "Name", "Car"));
  55. eventInfosTwo.add(new EventInfo("Address", "Name", "Company.Address"));
  56.  
  57. eventMap.put("test_event_two", eventInfosTwo);
  58.  
  59. // TEMP EXAMPLE END
  60.  
  61. LoadEventsInfo();
  62. LaunchIgnite();
  63. LaunchSpark();
  64. }
  65.  
  66. private static void LoadEventsInfo()
  67. {
  68. System.out.println("Loading Event Info");
  69.  
  70. String connectionString = "jdbc:sqlserver://vm-prod-ci\\prod;databaseName=DataPlatform;user=CMFUser;password=CMFUser";
  71.  
  72. try
  73. {
  74. // Get Connection
  75. Connection connection = DriverManager.getConnection(connectionString);
  76.  
  77. // Query
  78. String query = ";WITH FactoryEventObject_CTE as (\n" +
  79. "select FEO.FactoryEventObjectId, FEO.Name N1, FEOP.Name N2, FEOP.LinkedFactoryEventObjectId, '.' + cast(FEOP.Name as varchar(max)) as [path], FEOP.ReferenceId, FEOP.ReferencePropertyId\n" +
  80. " from CoreDataModel.T_FactoryEventObject FEO\n" +
  81. " inner join CoreDataModel.T_FactoryEventObjectProperty FEOP on FEOP.FactoryEventObjectId = FEO.FactoryEventObjectId\n" +
  82. "where LinkedFactoryEventObjectId is null\n" +
  83. "union all\n" +
  84. "select FEO.FactoryEventObjectId, FEO.Name, FEOP.Name, FEOP.LinkedFactoryEventObjectId, '.' + CAST(parent.N1 + Path as varchar(max)), FEOP.ReferenceId, FEOP.ReferencePropertyId\n" +
  85. " from CoreDataModel.T_FactoryEventObject FEO\n" +
  86. " inner join CoreDataModel.T_FactoryEventObjectProperty FEOP on FEOP.FactoryEventObjectId = FEO.FactoryEventObjectId\n" +
  87. " inner join FactoryEventObject_CTE parent on parent.FactoryEventObjectId = FEOP.LinkedFactoryEventObjectId\n" +
  88. ")\n" +
  89. "select FE.Name FactoryEventName, N1+Path as Path, ET.Name ReferenceName, ETP.Name ReferencePropertyName\n" +
  90. " from FactoryEventObject_CTE a\n" +
  91. "inner join CoreDataModel.T_FactoryEvent FE on FE.FactoryEventObjectId = a.FactoryEventObjectId\n" +
  92. "left join dbo.T_EntityType ET on a.ReferenceId = ET.EntityTypeId\n" +
  93. "left join dbo.T_EntityTypeProperty ETP on a.ReferencePropertyId = ETP.EntityTypePropertyId\n" +
  94. "order by Path;";
  95.  
  96. // Statement
  97. PreparedStatement preparedStatement = connection.prepareStatement(query);
  98.  
  99. // Execute
  100. ResultSet resultSet = preparedStatement.executeQuery();
  101.  
  102. // Loop Results
  103. int count = 0;
  104.  
  105. while(resultSet.next())
  106. {
  107. // Retrieve values from the result set
  108. String eventName = resultSet.getString("FactoryEventName");
  109. String referenceName = resultSet.getString("ReferenceName");
  110. String referencePropertyName = resultSet.getString("ReferencePropertyName");
  111. String path = resultSet.getString("Path");
  112.  
  113. // Generate info class
  114. EventInfo eventInfo = new EventInfo(referenceName, referencePropertyName, path);
  115.  
  116. // Insert new information into the map
  117. if(!eventMap.containsKey(eventName))
  118. {
  119. List<EventInfo> eventInfos = new ArrayList<EventInfo>();
  120. eventInfos.add(eventInfo);
  121. eventMap.put(eventName, eventInfos);
  122. }
  123. else
  124. {
  125. eventMap.get(eventName).add(eventInfo);
  126. }
  127.  
  128. count++;
  129. }
  130.  
  131. System.out.println("Loaded " + count + " Event Information Entries");
  132. }
  133. catch (SQLException e)
  134. {
  135. e.printStackTrace();
  136. }
  137. }
  138.  
  139. private static void LaunchIgnite()
  140. {
  141. System.setProperty("java.net.preferIPv4Stack", "true");
  142.  
  143. System.setProperty("IGNITE_REST_START_ON_CLIENT", "true");
  144.  
  145. // Apache Ignite node configuration.
  146. IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
  147.  
  148. try
  149. {
  150. // Declare Logger
  151. IgniteLogger igniteLogger = new Log4JLogger("log4j.xml");
  152.  
  153. // Set Logger
  154. igniteConfiguration.setGridLogger(igniteLogger);
  155. }
  156. catch (IgniteCheckedException e)
  157. {
  158. e.printStackTrace();
  159. }
  160.  
  161. // Activate Client Mode
  162. igniteConfiguration.setClientMode(true);
  163.  
  164. // Declare tcp discovery spi
  165. TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
  166.  
  167. // Declare discovery vm ip finder (static ip finder)
  168. TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
  169.  
  170. // Set addresses of vm ip finder
  171. tcpDiscoveryVmIpFinder.setAddresses(Arrays.asList("10.24.6.23:47500"));
  172.  
  173. // Set the Discovery SPI Ip Finder
  174. tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
  175.  
  176. // Override the default discovery SPI at the configuration object
  177. igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
  178.  
  179. /*
  180. // Ignite persistence configuration
  181. DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
  182.  
  183. // Enabling the persistence
  184. dataStorageConfiguration.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
  185.  
  186. // Applying settings
  187. igniteConfiguration.setDataStorageConfiguration(dataStorageConfiguration);
  188. */
  189.  
  190. System.out.println("---------------------------------------- STARTING ----------------------------------------");
  191. System.out.println("---------------------------------------- STARTING ----------------------------------------");
  192. System.out.println("---------------------------------------- STARTING ----------------------------------------");
  193.  
  194. // Starting Ignite
  195. ignite = Ignition.getOrStart(igniteConfiguration);
  196.  
  197. System.out.println("---------------------------------------- STARTED ----------------------------------------");
  198. System.out.println("---------------------------------------- STARTED ----------------------------------------");
  199. System.out.println("---------------------------------------- STARTED ----------------------------------------");
  200.  
  201. /*
  202. // Activate Cluster
  203. ignite.cluster().active(true);
  204. */
  205.  
  206. // Temporary Init
  207. IgniteCache<String, String> carCache = ignite.getOrCreateCache("Car");
  208. carCache.put("CarroEduardo", "{\"Brand\" : \"Bentley\"}");
  209.  
  210. IgniteCache<String, String> addressCache = ignite.getOrCreateCache("Address");
  211. addressCache.put("CMAddress", "{\"Street\" : \"Rua sobe e desce\"}");
  212. }
  213.  
  214. private static void LaunchSpark()
  215. {
  216. // Spark Session
  217. SparkSession sparkSession = SparkSession
  218. .builder()
  219. .appName("Spark Structured Streaming")
  220. .getOrCreate();
  221.  
  222. // Streaming Read
  223. Dataset<Row> dataset = sparkSession
  224. .readStream()
  225. .format("kafka")
  226. .option("kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095")
  227. .option("subscribePattern", ".*_raw")
  228. .load();
  229.  
  230. // Kafka Specific Selection
  231. dataset = dataset.selectExpr("CAST(topic AS STRING)" , "CAST(key AS STRING)", "CAST(value AS STRING)");
  232.  
  233. // Transformation
  234. StructType structType = new StructType();
  235. structType = structType.add("topic", DataTypes.StringType, false);
  236. structType = structType.add("key", DataTypes.StringType, true);
  237. structType = structType.add("value", DataTypes.StringType, false);
  238.  
  239. ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
  240.  
  241. MapFunction<Row, Row> myMapFunction = (row ->
  242. {
  243. // Get Values
  244. String rawTopic = row.getString(0);
  245. String value = row.getString(2);
  246.  
  247. // Compute New Topic
  248. String eventName = rawTopic.substring(0, rawTopic.length() - 3);
  249. String enrichedTopic = eventName + "enriched";
  250.  
  251. // Enrich
  252. JsonParser parser = new JsonParser();
  253. JsonObject jsonObject = parser.parse(value).getAsJsonObject();
  254.  
  255. System.out.println("---------- PRE UPDATED VALUE ----------");
  256. System.out.println(jsonObject.toString());
  257.  
  258. List<EventInfo> eventInfos = eventMap.get(eventName);
  259.  
  260. if(eventInfos != null)
  261. {
  262. for (EventInfo eventInfo : eventInfos)
  263. {
  264. String propertyValue = gsonUtils.ExtractProperty(jsonObject, eventInfo.getPath());
  265.  
  266. IgniteCache<String, String> cache = ignite.getOrCreateCache(eventInfo.getReferenceName());
  267. String cacheValue = cache.get(propertyValue);
  268.  
  269. System.out.println("Cache Value " + cacheValue);
  270. System.out.println(eventInfo);
  271. System.out.println(propertyValue);
  272. System.out.println();
  273.  
  274. if (cacheValue == null)
  275. {
  276. // Cache Miss
  277. System.out.println("Cache Miss");
  278. }
  279. else
  280. {
  281. System.out.println("---------- UPDATING ----------");
  282. System.out.println("UPDATE PATH: " + eventInfo.getPath());
  283. System.out.println("UPDATE VALUE: " + cacheValue);
  284. System.out.println(jsonObject.toString());
  285.  
  286. gsonUtils.UpdateProperty(jsonObject, eventInfo.getPath(), cacheValue);
  287. }
  288. }
  289. }
  290.  
  291. System.out.println("---------- POS UPDATED VALUE ----------");
  292. System.out.println(jsonObject.toString());
  293.  
  294. String modifiedValue = jsonObject.toString();
  295.  
  296. // Create
  297. return RowFactory.create(enrichedTopic, row.getString(1), modifiedValue);
  298. });
  299.  
  300. dataset = dataset.map(myMapFunction, encoder).toDF("topic", "key", "value");
  301.  
  302. // Streaming Write
  303. StreamingQuery streamingQuery = dataset
  304. .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)")
  305. .writeStream()
  306. .format("kafka")
  307. .option("kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095")
  308. .option("checkpointLocation", "checkpoint")
  309. .start();
  310.  
  311. // Loop
  312. try
  313. {
  314. streamingQuery.awaitTermination();
  315. }
  316. catch (StreamingQueryException e)
  317. {
  318. e.printStackTrace();
  319. }
  320. }
  321. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement