Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.emasphere.poc.parquetspark;
- import com.jcraft.jsch.JSch;
- import com.jcraft.jsch.Session;
- import org.apache.commons.io.IOUtils;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SaveMode;
- import org.postgresql.Driver;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.sql.Connection;
- import java.sql.SQLException;
- import java.util.Properties;
- /**
- * @author Sebastien Gerard
- */
- public class ParquetSqlImporter {
- public static ParquetSqlImporter sqlImporter(String tableName,
- String query,
- JavaSparkContext context) {
- try {
- return new ParquetSqlImporter(tableName, query, context);
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot initialize the importer.", e);
- }
- }
- public static final String LOCATION = "s3a://ema-data-lake/parquet-test/test.parquet";
- private static final Logger logger = LoggerFactory.getLogger(ParquetSqlImporter.class);
- private final String tableName;
- private final String query;
- private final JavaSparkContext context;
- private final Properties properties;
- public ParquetSqlImporter(String tableName,
- String query,
- JavaSparkContext context) throws IOException {
- this.tableName = tableName;
- this.query = query;
- this.context = context;
- this.properties = new Properties();
- this.properties.load(getClass().getResourceAsStream("/application.properties"));
- }
- public void load() {
- final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
- + "/" + properties.getProperty("importer.db.name")
- + "?user=" + properties.getProperty("importer.db.user")
- + "&password=" + properties.getProperty("importer.db.password");
- final Session session = doSshTunnel(properties);
- try {
- logDbDetails(url);
- SQLContext
- .getOrCreate(context.sc())
- .read()
- .option("driver", Driver.class.getName())
- .jdbc(url, tableName, new Properties())
- .filter(query)
- .write()
- .mode(SaveMode.Append)
- .parquet(LOCATION);
- } finally {
- if (session != null) {
- session.disconnect();
- }
- }
- }
- private void logDbDetails(String url) {
- logger.info("Configured DB URL [{}].", url);
- try {
- final Connection connect = new Driver().connect(url, new Properties());
- logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
- } catch (SQLException e) {
- logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
- }
- }
- private Session doSshTunnel(Properties properties) {
- try {
- final JSch jsch = new JSch();
- final Session session = jsch.getSession(
- properties.getProperty("importer.bastion.user"),
- properties.getProperty("importer.bastion.url"),
- Integer.valueOf(properties.getProperty("importer.bastion.port"))
- );
- jsch.addIdentity(
- "eb-preprod",
- IOUtils.toByteArray(
- ParquetSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
- ),
- null,
- null
- );
- final Properties config = new Properties();
- config.put("StrictHostKeyChecking", "no");
- session.setConfig(config);
- session.connect();
- session.setPortForwardingL(
- Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
- properties.getProperty("importer.db.url"),
- Integer.valueOf(properties.getProperty("importer.db.port"))
- );
- return session;
- } catch (Exception e) {
- logger.error("Cannot open SSH tunnel.", e);
- return null;
- }
- }
- }
Add Comment
Please, Sign In to add comment