Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package Main;
- import java.net.InetSocketAddress;
- import java.sql.*;
- import java.util.*;
- import org.apache.ignite.*;
- import org.apache.ignite.cache.CacheMode;
- import org.apache.ignite.configuration.AddressResolver;
- import org.apache.ignite.configuration.CacheConfiguration;
- import org.apache.ignite.configuration.DataStorageConfiguration;
- import org.apache.ignite.configuration.IgniteConfiguration;
- import org.apache.ignite.failure.NoOpFailureHandler;
- import org.apache.ignite.logger.log4j.Log4JLogger;
- import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
- import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
- import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
- import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
- import org.apache.spark.api.java.function.MapFunction;
- import org.apache.spark.sql.*;
- import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
- import org.apache.spark.sql.catalyst.encoders.RowEncoder;
- import org.apache.spark.sql.streaming.StreamingQuery;
- import org.apache.spark.sql.streaming.StreamingQueryException;
- import org.apache.spark.sql.types.DataTypes;
- import org.apache.spark.sql.types.StructType;
- import com.google.gson.*;
- import Model.*;
- import Utils.*;
- public class Program
- {
- private static IGsonUtils gsonUtils = new GsonUtils();
- private static HashMap<String, List<EventInfo>> eventMap = new HashMap<String, List<EventInfo>>();
- private static Ignite ignite;
- public static void main(String[] args)
- {
- System.out.println("App Successfully Started");
- // TEMP EXAMPLE START
- List<EventInfo> eventInfosOne = new ArrayList<>();
- eventInfosOne.add(new EventInfo("Car", "Name", "Car"));
- eventInfosOne.add(new EventInfo("Address", "Name", "Company.Address"));
- eventMap.put("test_event_one", eventInfosOne);
- List<EventInfo> eventInfosTwo = new ArrayList<>();
- eventInfosTwo.add(new EventInfo("Car", "Name", "Car"));
- eventInfosTwo.add(new EventInfo("Address", "Name", "Company.Address"));
- eventMap.put("test_event_two", eventInfosTwo);
- // TEMP EXAMPLE END
- LoadEventsInfo();
- LaunchIgnite();
- LaunchSpark();
- }
- private static void LoadEventsInfo()
- {
- System.out.println("Loading Event Info");
- String connectionString = "jdbc:sqlserver://vm-prod-ci\\prod;databaseName=DataPlatform;user=CMFUser;password=CMFUser";
- try
- {
- // Get Connection
- Connection connection = DriverManager.getConnection(connectionString);
- // Query
- String query = ";WITH FactoryEventObject_CTE as (\n" +
- "select FEO.FactoryEventObjectId, FEO.Name N1, FEOP.Name N2, FEOP.LinkedFactoryEventObjectId, '.' + cast(FEOP.Name as varchar(max)) as [path], FEOP.ReferenceId, FEOP.ReferencePropertyId\n" +
- " from CoreDataModel.T_FactoryEventObject FEO\n" +
- " inner join CoreDataModel.T_FactoryEventObjectProperty FEOP on FEOP.FactoryEventObjectId = FEO.FactoryEventObjectId\n" +
- "where LinkedFactoryEventObjectId is null\n" +
- "union all\n" +
- "select FEO.FactoryEventObjectId, FEO.Name, FEOP.Name, FEOP.LinkedFactoryEventObjectId, '.' + CAST(parent.N1 + Path as varchar(max)), FEOP.ReferenceId, FEOP.ReferencePropertyId\n" +
- " from CoreDataModel.T_FactoryEventObject FEO\n" +
- " inner join CoreDataModel.T_FactoryEventObjectProperty FEOP on FEOP.FactoryEventObjectId = FEO.FactoryEventObjectId\n" +
- " inner join FactoryEventObject_CTE parent on parent.FactoryEventObjectId = FEOP.LinkedFactoryEventObjectId\n" +
- ")\n" +
- "select FE.Name FactoryEventName, N1+Path as Path, ET.Name ReferenceName, ETP.Name ReferencePropertyName\n" +
- " from FactoryEventObject_CTE a\n" +
- "inner join CoreDataModel.T_FactoryEvent FE on FE.FactoryEventObjectId = a.FactoryEventObjectId\n" +
- "left join dbo.T_EntityType ET on a.ReferenceId = ET.EntityTypeId\n" +
- "left join dbo.T_EntityTypeProperty ETP on a.ReferencePropertyId = ETP.EntityTypePropertyId\n" +
- "order by Path;";
- // Statement
- PreparedStatement preparedStatement = connection.prepareStatement(query);
- // Execute
- ResultSet resultSet = preparedStatement.executeQuery();
- // Loop Results
- int count = 0;
- while(resultSet.next())
- {
- // Retrieve values from the result set
- String eventName = resultSet.getString("FactoryEventName");
- String referenceName = resultSet.getString("ReferenceName");
- String referencePropertyName = resultSet.getString("ReferencePropertyName");
- String path = resultSet.getString("Path");
- // Generate info class
- EventInfo eventInfo = new EventInfo(referenceName, referencePropertyName, path);
- // Insert new information into the map
- if(!eventMap.containsKey(eventName))
- {
- List<EventInfo> eventInfos = new ArrayList<EventInfo>();
- eventInfos.add(eventInfo);
- eventMap.put(eventName, eventInfos);
- }
- else
- {
- eventMap.get(eventName).add(eventInfo);
- }
- count++;
- }
- System.out.println("Loaded " + count + " Event Information Entries");
- }
- catch (SQLException e)
- {
- e.printStackTrace();
- }
- }
- private static void LaunchIgnite()
- {
- System.setProperty("java.net.preferIPv4Stack", "true");
- System.setProperty("IGNITE_REST_START_ON_CLIENT", "true");
- // Apache Ignite node configuration.
- IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
- try
- {
- // Declare Logger
- IgniteLogger igniteLogger = new Log4JLogger("log4j.xml");
- // Set Logger
- igniteConfiguration.setGridLogger(igniteLogger);
- }
- catch (IgniteCheckedException e)
- {
- e.printStackTrace();
- }
- // Activate Client Mode
- igniteConfiguration.setClientMode(true);
- // Declare tcp discovery spi
- TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
- // Declare discovery vm ip finder (static ip finder)
- TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
- // Set addresses of vm ip finder
- tcpDiscoveryVmIpFinder.setAddresses(Arrays.asList("10.24.6.23:47500"));
- // Set the Discovery SPI Ip Finder
- tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
- // Override the default discovery SPI at the configuration object
- igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
- /*
- // Ignite persistence configuration
- DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
- // Enabling the persistence
- dataStorageConfiguration.getDefaultDataRegionConfiguration().setPersistenceEnabled(true);
- // Applying settings
- igniteConfiguration.setDataStorageConfiguration(dataStorageConfiguration);
- */
- System.out.println("---------------------------------------- STARTING ----------------------------------------");
- System.out.println("---------------------------------------- STARTING ----------------------------------------");
- System.out.println("---------------------------------------- STARTING ----------------------------------------");
- // Starting Ignite
- ignite = Ignition.getOrStart(igniteConfiguration);
- System.out.println("---------------------------------------- STARTED ----------------------------------------");
- System.out.println("---------------------------------------- STARTED ----------------------------------------");
- System.out.println("---------------------------------------- STARTED ----------------------------------------");
- /*
- // Activate Cluster
- ignite.cluster().active(true);
- */
- // Temporary Init
- IgniteCache<String, String> carCache = ignite.getOrCreateCache("Car");
- carCache.put("CarroEduardo", "{\"Brand\" : \"Bentley\"}");
- IgniteCache<String, String> addressCache = ignite.getOrCreateCache("Address");
- addressCache.put("CMAddress", "{\"Street\" : \"Rua sobe e desce\"}");
- }
- private static void LaunchSpark()
- {
- // Spark Session
- SparkSession sparkSession = SparkSession
- .builder()
- .appName("Spark Structured Streaming")
- .getOrCreate();
- // Streaming Read
- Dataset<Row> dataset = sparkSession
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095")
- .option("subscribePattern", ".*_raw")
- .load();
- // Kafka Specific Selection
- dataset = dataset.selectExpr("CAST(topic AS STRING)" , "CAST(key AS STRING)", "CAST(value AS STRING)");
- // Transformation
- StructType structType = new StructType();
- structType = structType.add("topic", DataTypes.StringType, false);
- structType = structType.add("key", DataTypes.StringType, true);
- structType = structType.add("value", DataTypes.StringType, false);
- ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
- MapFunction<Row, Row> myMapFunction = (row ->
- {
- // Get Values
- String rawTopic = row.getString(0);
- String value = row.getString(2);
- // Compute New Topic
- String eventName = rawTopic.substring(0, rawTopic.length() - 3);
- String enrichedTopic = eventName + "enriched";
- // Enrich
- JsonParser parser = new JsonParser();
- JsonObject jsonObject = parser.parse(value).getAsJsonObject();
- System.out.println("---------- PRE UPDATED VALUE ----------");
- System.out.println(jsonObject.toString());
- List<EventInfo> eventInfos = eventMap.get(eventName);
- if(eventInfos != null)
- {
- for (EventInfo eventInfo : eventInfos)
- {
- String propertyValue = gsonUtils.ExtractProperty(jsonObject, eventInfo.getPath());
- IgniteCache<String, String> cache = ignite.getOrCreateCache(eventInfo.getReferenceName());
- String cacheValue = cache.get(propertyValue);
- System.out.println("Cache Value " + cacheValue);
- System.out.println(eventInfo);
- System.out.println(propertyValue);
- System.out.println();
- if (cacheValue == null)
- {
- // Cache Miss
- System.out.println("Cache Miss");
- }
- else
- {
- System.out.println("---------- UPDATING ----------");
- System.out.println("UPDATE PATH: " + eventInfo.getPath());
- System.out.println("UPDATE VALUE: " + cacheValue);
- System.out.println(jsonObject.toString());
- gsonUtils.UpdateProperty(jsonObject, eventInfo.getPath(), cacheValue);
- }
- }
- }
- System.out.println("---------- POS UPDATED VALUE ----------");
- System.out.println(jsonObject.toString());
- String modifiedValue = jsonObject.toString();
- // Create
- return RowFactory.create(enrichedTopic, row.getString(1), modifiedValue);
- });
- dataset = dataset.map(myMapFunction, encoder).toDF("topic", "key", "value");
- // Streaming Write
- StreamingQuery streamingQuery = dataset
- .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "10.24.16.19:9093,10.24.16.19:9094,10.24.16.19:9095")
- .option("checkpointLocation", "checkpoint")
- .start();
- // Loop
- try
- {
- streamingQuery.awaitTermination();
- }
- catch (StreamingQueryException e)
- {
- e.printStackTrace();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement