Advertisement
Guest User

Untitled

a guest
Apr 3rd, 2017
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.82 KB | None | 0 0
  1. /**
  2. * This is an example of spark streaming function that
  3. * inserts data into Splice Machine using a VTI.
  4. *
  5. * @author Erin Driggers
  6. */
  7.  
  8. public class SaveRDD implements Function<JavaRDD<String>, Void>, Externalizable {
  9.  
  10. private static final Logger LOG = Logger.getLogger(SaveRDD.class);
  11.  
  12. @Override
  13. public Void call(JavaRDD<String> rddRFIDMessages) throws Exception {
  14. LOG.debug("About to read results:");
  15. if (rddRFIDMessages != null && rddRFIDMessages.count() > 0) {
  16. LOG.debug("Data to process:");
  17. //Convert to list
  18. List<String> rfidMessages = rddRFIDMessages.collect();
  19. int numRcds = rfidMessages.size();
  20.  
  21. if (numRcds > 0) {
  22. try {
  23. Connection con = DriverManager.getConnection("jdbc:splice://localhost:1527/splicedb;user=splice;password=admin");
  24.  
  25. //Syntax for using a class instance in a VTI, this could also be a table function
  26. String vtiStatement = "INSERT INTO IOT.RFID "
  27. + "select s.* from new com.splicemachine.tutorials.sparkstreaming.mqtt.RFIDMessageVTI(?) s ("
  28. + RFIDMessage.getTableDefinition() + ")";
  29. PreparedStatement ps = con.prepareStatement(vtiStatement);
  30. ps.setObject(1, rfidMessages);
  31. ps.execute();
  32. } catch (Exception e) {
  33. //It is important to catch the exceptions as log messages because it is difficult
  34. //to trace what is happening otherwise
  35. LOG.error("Exception saving MQTT records to the database" + e.getMessage(), e);
  36. } finally {
  37. LOG.info("Complete insert into IOT.RFID");
  38. }
  39. }
  40. }
  41. return null;
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement