Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.emasphere.poc.hbase;
- import com.emasphere.data.executor.common.DataFormatUtils;
- import com.emasphere.data.executor.common.utils.FlowExecutorUtils;
- import com.jcraft.jsch.JSch;
- import com.jcraft.jsch.Session;
- import org.apache.commons.io.IOUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.Mutation;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.postgresql.Driver;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import scala.Tuple2;
- import java.io.File;
- import java.io.IOException;
- import java.math.BigDecimal;
- import java.net.MalformedURLException;
- import java.net.URL;
- import java.sql.Connection;
- import java.sql.Date;
- import java.sql.SQLException;
- import java.sql.Timestamp;
- import java.util.Arrays;
- import java.util.Properties;
- import static org.apache.hadoop.hbase.HConstants.*;
- import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*;
- import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*;
- import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*;
- /**
- * @author Sebastien Gerard
- */
- public class HBaseSqlImporter {
- public static HBaseSqlImporter sqlImporter(String tableName,
- String query,
- JavaSparkContext context) {
- try {
- return new HBaseSqlImporter(tableName, query, context);
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot initialize the importer.", e);
- }
- }
- private static final Logger logger = LoggerFactory.getLogger(HBaseSqlImporter.class);
- private final String tableName;
- private final String query;
- private final JavaSparkContext context;
- private final Properties properties;
- public HBaseSqlImporter(String tableName,
- String query,
- JavaSparkContext context) throws IOException {
- this.tableName = tableName;
- this.query = query;
- this.context = context;
- this.properties = new Properties();
- this.properties.load(getClass().getResourceAsStream("/application.properties"));
- }
- public void load() throws MalformedURLException {
- final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
- + "/" + properties.getProperty("importer.db.name")
- + "?user=" + properties.getProperty("importer.db.user")
- + "&password=" + properties.getProperty("importer.db.password");
- final Session session = doSshTunnel(properties);
- try {
- logDbDetails(url);
- final Configuration configuration = initializeConfiguration();
- Dataset<Row> sqlDataSet = SQLContext
- .getOrCreate(context.sc())
- .read()
- .option("driver", Driver.class.getName())
- .jdbc(url, tableName, new Properties());
- if (query != null) {
- sqlDataSet = sqlDataSet.filter(query);
- }
- final Mapper func = new Mapper();
- sqlDataSet
- .toJavaRDD()
- .mapToPair(func)
- .saveAsNewAPIHadoopFile(
- FlowExecutorUtils.TABLE,
- ImmutableBytesWritable.class,
- Result.class,
- TableOutputFormat.class,
- new JobConf(configuration)
- );
- } finally {
- if (session != null) {
- session.disconnect();
- }
- }
- }
- private Configuration initializeConfiguration() throws MalformedURLException {
- final Configuration configuration = HBaseConfiguration.create();
- final URL url = new URL("file:///etc/hbase/conf/hbase-site.xml");
- if (new File(url.getFile()).exists()) {
- logger.info("Loading HBase configuration, found {}.", new File(url.getFile()).exists());
- configuration.addResource(url);
- } else {
- logger.info("HBase configuration file [{}] does not exist.", url);
- }
- configuration.set(INPUT_TABLE, "flow");
- configuration.set(OUTDIR, "flow");
- configuration.set(OUTPUT_TABLE, "flow");
- configuration.set(ZOOKEEPER_QUORUM, "localhost");
- configuration.set("hbase.zookeeper.property.clientPort", "2181");
- return configuration;
- }
- private void logDbDetails(String url) {
- logger.info("Configured DB URL [{}].", url);
- try {
- final Connection connect = new Driver().connect(url, new Properties());
- logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
- } catch (SQLException e) {
- logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
- }
- }
- private Session doSshTunnel(Properties properties) {
- try {
- final JSch jsch = new JSch();
- final Session session = jsch.getSession(
- properties.getProperty("importer.bastion.user"),
- properties.getProperty("importer.bastion.url"),
- Integer.valueOf(properties.getProperty("importer.bastion.port"))
- );
- jsch.addIdentity(
- "eb-preprod",
- IOUtils.toByteArray(
- HBaseSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
- ),
- null,
- null
- );
- final Properties config = new Properties();
- config.put("StrictHostKeyChecking", "no");
- session.setConfig(config);
- session.connect();
- session.setPortForwardingL(
- Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
- properties.getProperty("importer.db.url"),
- Integer.valueOf(properties.getProperty("importer.db.port"))
- );
- return session;
- } catch (Exception e) {
- logger.error("Cannot open SSH tunnel.", e);
- return null;
- }
- }
- private static class Mapper implements PairFunction<Row, ImmutableBytesWritable, Mutation> {
- @Override
- public Tuple2<ImmutableBytesWritable, Mutation> call(Row row) {
- final byte[] rowKey = concat(
- DataFormatUtils.toBytes(row.getDecimal(row.fieldIndex("id"))),
- DataFormatUtils.toBytes(row.getString(row.fieldIndex("i")))
- );
- final Put put = new Put(rowKey);
- for (int i = 0; i < row.size(); i++) {
- final Object value = row.get(i);
- if (value == null) {
- continue;
- }
- final String name = row.schema().fieldNames()[i];
- final byte[] bytesValue;
- if (value instanceof BigDecimal) {
- bytesValue = DataFormatUtils.toBytes((BigDecimal) value);
- } else if (value instanceof Date) {
- bytesValue = DataFormatUtils.toBytes(((Date) value).toLocalDate());
- } else if (value instanceof Timestamp) {
- bytesValue = DataFormatUtils.toBytes(((Timestamp) value).toLocalDateTime());
- } else if (value instanceof Boolean) {
- bytesValue = DataFormatUtils.toBytes((Boolean) value);
- } else if (value instanceof String) {
- bytesValue = DataFormatUtils.toBytes((String) value);
- } else {
- throw new UnsupportedOperationException("Unsupported type [" + value.getClass() + "].");
- }
- put.addColumn(
- "d".getBytes(),
- name.getBytes(),
- 0,
- bytesValue
- );
- }
- return Tuple2.apply(new ImmutableBytesWritable(rowKey), put);
- }
- public byte[] concat(byte[] first, byte[] second) {
- byte[] both = Arrays.copyOf(first, first.length + second.length);
- System.arraycopy(second, 0, both, first.length, second.length);
- return both;
- }
- }
- }
Add Comment
Please, Sign In to add comment