Guest User

Untitled

a guest
Jul 30th, 2018
108
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.42 KB | None | 0 0
  1. package com.emasphere.poc.parquetspark;
  2.  
  3. import com.jcraft.jsch.JSch;
  4. import com.jcraft.jsch.Session;
  5. import org.apache.commons.io.IOUtils;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.sql.Dataset;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SQLContext;
  10. import org.apache.spark.sql.SaveMode;
  11. import org.postgresql.Driver;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14.  
  15. import java.io.IOException;
  16. import java.sql.Connection;
  17. import java.sql.SQLException;
  18. import java.util.Properties;
  19.  
  20. /**
  21. * @author Sebastien Gerard
  22. */
  23. public class ParquetSqlImporter {
  24.  
  25. public static ParquetSqlImporter sqlImporter(String tableName,
  26. String query,
  27. JavaSparkContext context) {
  28. try {
  29. return new ParquetSqlImporter(tableName, query, context);
  30. } catch (Exception e) {
  31. throw new IllegalArgumentException("Cannot initialize the importer.", e);
  32. }
  33. }
  34.  
  35. public static final String LOCATION = "s3a://ema-data-lake/parquet-test/test.parquet";
  36.  
  37. private static final Logger logger = LoggerFactory.getLogger(ParquetSqlImporter.class);
  38.  
  39. private final String tableName;
  40. private final String query;
  41. private final JavaSparkContext context;
  42. private final Properties properties;
  43.  
  44. public ParquetSqlImporter(String tableName,
  45. String query,
  46. JavaSparkContext context) throws IOException {
  47. this.tableName = tableName;
  48. this.query = query;
  49. this.context = context;
  50.  
  51. this.properties = new Properties();
  52. this.properties.load(getClass().getResourceAsStream("/application.properties"));
  53. }
  54.  
  55. public void load() {
  56. final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
  57. + "/" + properties.getProperty("importer.db.name")
  58. + "?user=" + properties.getProperty("importer.db.user")
  59. + "&password=" + properties.getProperty("importer.db.password");
  60.  
  61.  
  62. final Session session = doSshTunnel(properties);
  63.  
  64. try {
  65. logDbDetails(url);
  66.  
  67. SQLContext
  68. .getOrCreate(context.sc())
  69. .read()
  70. .option("driver", Driver.class.getName())
  71. .jdbc(url, tableName, new Properties())
  72. .filter(query)
  73. .write()
  74. .mode(SaveMode.Append)
  75. .parquet(LOCATION);
  76. } finally {
  77. if (session != null) {
  78. session.disconnect();
  79. }
  80. }
  81. }
  82.  
  83. private void logDbDetails(String url) {
  84. logger.info("Configured DB URL [{}].", url);
  85.  
  86. try {
  87. final Connection connect = new Driver().connect(url, new Properties());
  88.  
  89. logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
  90. } catch (SQLException e) {
  91. logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
  92. }
  93. }
  94.  
  95. private Session doSshTunnel(Properties properties) {
  96. try {
  97. final JSch jsch = new JSch();
  98. final Session session = jsch.getSession(
  99. properties.getProperty("importer.bastion.user"),
  100. properties.getProperty("importer.bastion.url"),
  101. Integer.valueOf(properties.getProperty("importer.bastion.port"))
  102. );
  103.  
  104. jsch.addIdentity(
  105. "eb-preprod",
  106. IOUtils.toByteArray(
  107. ParquetSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
  108. ),
  109. null,
  110. null
  111. );
  112.  
  113. final Properties config = new Properties();
  114. config.put("StrictHostKeyChecking", "no");
  115. session.setConfig(config);
  116.  
  117. session.connect();
  118. session.setPortForwardingL(
  119. Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
  120. properties.getProperty("importer.db.url"),
  121. Integer.valueOf(properties.getProperty("importer.db.port"))
  122. );
  123.  
  124. return session;
  125. } catch (Exception e) {
  126. logger.error("Cannot open SSH tunnel.", e);
  127.  
  128. return null;
  129. }
  130. }
  131.  
  132. }
Add Comment
Please, Sign In to add comment