Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * This is an example of spark streaming function that
- * inserts data into Splice Machine using a VTI.
- *
- * @author Erin Driggers
- */
- public class SaveRDD implements Function<JavaRDD<String>, Void>, Externalizable {
- private static final Logger LOG = Logger.getLogger(SaveRDD.class);
- @Override
- public Void call(JavaRDD<String> rddRFIDMessages) throws Exception {
- LOG.debug("About to read results:");
- if (rddRFIDMessages != null && rddRFIDMessages.count() > 0) {
- LOG.debug("Data to process:");
- //Convert to list
- List<String> rfidMessages = rddRFIDMessages.collect();
- int numRcds = rfidMessages.size();
- if (numRcds > 0) {
- try {
- Connection con = DriverManager.getConnection("jdbc:splice://localhost:1527/splicedb;user=splice;password=admin");
- //Syntax for using a class instance in a VTI, this could also be a table function
- String vtiStatement = "INSERT INTO IOT.RFID "
- + "select s.* from new com.splicemachine.tutorials.sparkstreaming.mqtt.RFIDMessageVTI(?) s ("
- + RFIDMessage.getTableDefinition() + ")";
- PreparedStatement ps = con.prepareStatement(vtiStatement);
- ps.setObject(1, rfidMessages);
- ps.execute();
- } catch (Exception e) {
- //It is important to catch the exceptions as log messages because it is difficult
- //to trace what is happening otherwise
- LOG.error("Exception saving MQTT records to the database" + e.getMessage(), e);
- } finally {
- LOG.info("Complete insert into IOT.RFID");
- }
- }
- }
- return null;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement