Guest User

Untitled

a guest
Jul 30th, 2018
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.73 KB | None | 0 0
  1. package com.emasphere.poc.hbase;
  2.  
  3. import com.emasphere.data.executor.common.DataFormatUtils;
  4. import com.emasphere.data.executor.common.utils.FlowExecutorUtils;
  5. import com.jcraft.jsch.JSch;
  6. import com.jcraft.jsch.Session;
  7. import org.apache.commons.io.IOUtils;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.hbase.HBaseConfiguration;
  10. import org.apache.hadoop.hbase.client.Mutation;
  11. import org.apache.hadoop.hbase.client.Put;
  12. import org.apache.hadoop.hbase.client.Result;
  13. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  14. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
  15. import org.apache.hadoop.mapred.JobConf;
  16. import org.apache.spark.api.java.JavaSparkContext;
  17. import org.apache.spark.api.java.function.PairFunction;
  18. import org.apache.spark.sql.Dataset;
  19. import org.apache.spark.sql.Row;
  20. import org.apache.spark.sql.SQLContext;
  21. import org.postgresql.Driver;
  22. import org.slf4j.Logger;
  23. import org.slf4j.LoggerFactory;
  24. import scala.Tuple2;
  25.  
  26. import java.io.File;
  27. import java.io.IOException;
  28. import java.math.BigDecimal;
  29. import java.net.MalformedURLException;
  30. import java.net.URL;
  31. import java.sql.Connection;
  32. import java.sql.Date;
  33. import java.sql.SQLException;
  34. import java.sql.Timestamp;
  35. import java.util.Arrays;
  36. import java.util.Properties;
  37.  
  38. import static org.apache.hadoop.hbase.HConstants.*;
  39. import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*;
  40. import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*;
  41. import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*;
  42.  
  43. /**
  44. * @author Sebastien Gerard
  45. */
  46. public class HBaseSqlImporter {
  47.  
  48. public static HBaseSqlImporter sqlImporter(String tableName,
  49. String query,
  50. JavaSparkContext context) {
  51. try {
  52. return new HBaseSqlImporter(tableName, query, context);
  53. } catch (Exception e) {
  54. throw new IllegalArgumentException("Cannot initialize the importer.", e);
  55. }
  56. }
  57.  
  58. private static final Logger logger = LoggerFactory.getLogger(HBaseSqlImporter.class);
  59.  
  60. private final String tableName;
  61. private final String query;
  62. private final JavaSparkContext context;
  63. private final Properties properties;
  64.  
  65. public HBaseSqlImporter(String tableName,
  66. String query,
  67. JavaSparkContext context) throws IOException {
  68. this.tableName = tableName;
  69. this.query = query;
  70. this.context = context;
  71.  
  72. this.properties = new Properties();
  73. this.properties.load(getClass().getResourceAsStream("/application.properties"));
  74. }
  75.  
  76. public void load() throws MalformedURLException {
  77. final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port")
  78. + "/" + properties.getProperty("importer.db.name")
  79. + "?user=" + properties.getProperty("importer.db.user")
  80. + "&password=" + properties.getProperty("importer.db.password");
  81.  
  82.  
  83. final Session session = doSshTunnel(properties);
  84.  
  85. try {
  86. logDbDetails(url);
  87.  
  88. final Configuration configuration = initializeConfiguration();
  89.  
  90. Dataset<Row> sqlDataSet = SQLContext
  91. .getOrCreate(context.sc())
  92. .read()
  93. .option("driver", Driver.class.getName())
  94. .jdbc(url, tableName, new Properties());
  95.  
  96. if (query != null) {
  97. sqlDataSet = sqlDataSet.filter(query);
  98. }
  99.  
  100. final Mapper func = new Mapper();
  101.  
  102. sqlDataSet
  103. .toJavaRDD()
  104. .mapToPair(func)
  105. .saveAsNewAPIHadoopFile(
  106. FlowExecutorUtils.TABLE,
  107. ImmutableBytesWritable.class,
  108. Result.class,
  109. TableOutputFormat.class,
  110. new JobConf(configuration)
  111. );
  112. } finally {
  113. if (session != null) {
  114. session.disconnect();
  115. }
  116. }
  117. }
  118.  
  119. private Configuration initializeConfiguration() throws MalformedURLException {
  120. final Configuration configuration = HBaseConfiguration.create();
  121.  
  122. final URL url = new URL("file:///etc/hbase/conf/hbase-site.xml");
  123.  
  124. if (new File(url.getFile()).exists()) {
  125. logger.info("Loading HBase configuration, found {}.", new File(url.getFile()).exists());
  126.  
  127. configuration.addResource(url);
  128. } else {
  129. logger.info("HBase configuration file [{}] does not exist.", url);
  130. }
  131.  
  132. configuration.set(INPUT_TABLE, "flow");
  133. configuration.set(OUTDIR, "flow");
  134. configuration.set(OUTPUT_TABLE, "flow");
  135. configuration.set(ZOOKEEPER_QUORUM, "localhost");
  136. configuration.set("hbase.zookeeper.property.clientPort", "2181");
  137.  
  138. return configuration;
  139. }
  140.  
  141. private void logDbDetails(String url) {
  142. logger.info("Configured DB URL [{}].", url);
  143.  
  144. try {
  145. final Connection connect = new Driver().connect(url, new Properties());
  146.  
  147. logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema());
  148. } catch (SQLException e) {
  149. logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e);
  150. }
  151. }
  152.  
  153. private Session doSshTunnel(Properties properties) {
  154. try {
  155. final JSch jsch = new JSch();
  156. final Session session = jsch.getSession(
  157. properties.getProperty("importer.bastion.user"),
  158. properties.getProperty("importer.bastion.url"),
  159. Integer.valueOf(properties.getProperty("importer.bastion.port"))
  160. );
  161.  
  162. jsch.addIdentity(
  163. "eb-preprod",
  164. IOUtils.toByteArray(
  165. HBaseSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file"))
  166. ),
  167. null,
  168. null
  169. );
  170.  
  171. final Properties config = new Properties();
  172. config.put("StrictHostKeyChecking", "no");
  173. session.setConfig(config);
  174.  
  175. session.connect();
  176. session.setPortForwardingL(
  177. Integer.valueOf(properties.getProperty("importer.bastion.local-port")),
  178. properties.getProperty("importer.db.url"),
  179. Integer.valueOf(properties.getProperty("importer.db.port"))
  180. );
  181.  
  182. return session;
  183. } catch (Exception e) {
  184. logger.error("Cannot open SSH tunnel.", e);
  185.  
  186. return null;
  187. }
  188. }
  189.  
  190. private static class Mapper implements PairFunction<Row, ImmutableBytesWritable, Mutation> {
  191.  
  192. @Override
  193. public Tuple2<ImmutableBytesWritable, Mutation> call(Row row) {
  194. final byte[] rowKey = concat(
  195. DataFormatUtils.toBytes(row.getDecimal(row.fieldIndex("id"))),
  196. DataFormatUtils.toBytes(row.getString(row.fieldIndex("i")))
  197. );
  198.  
  199. final Put put = new Put(rowKey);
  200.  
  201. for (int i = 0; i < row.size(); i++) {
  202. final Object value = row.get(i);
  203.  
  204. if (value == null) {
  205. continue;
  206. }
  207.  
  208. final String name = row.schema().fieldNames()[i];
  209.  
  210. final byte[] bytesValue;
  211. if (value instanceof BigDecimal) {
  212. bytesValue = DataFormatUtils.toBytes((BigDecimal) value);
  213. } else if (value instanceof Date) {
  214. bytesValue = DataFormatUtils.toBytes(((Date) value).toLocalDate());
  215. } else if (value instanceof Timestamp) {
  216. bytesValue = DataFormatUtils.toBytes(((Timestamp) value).toLocalDateTime());
  217. } else if (value instanceof Boolean) {
  218. bytesValue = DataFormatUtils.toBytes((Boolean) value);
  219. } else if (value instanceof String) {
  220. bytesValue = DataFormatUtils.toBytes((String) value);
  221. } else {
  222. throw new UnsupportedOperationException("Unsupported type [" + value.getClass() + "].");
  223. }
  224.  
  225. put.addColumn(
  226. "d".getBytes(),
  227. name.getBytes(),
  228. 0,
  229. bytesValue
  230. );
  231. }
  232.  
  233. return Tuple2.apply(new ImmutableBytesWritable(rowKey), put);
  234. }
  235.  
  236. public byte[] concat(byte[] first, byte[] second) {
  237. byte[] both = Arrays.copyOf(first, first.length + second.length);
  238. System.arraycopy(second, 0, both, first.length, second.length);
  239. return both;
  240. }
  241. }
  242. }
Add Comment
Please, Sign In to add comment