Advertisement
Guest User

Untitled

a guest
Mar 3rd, 2019
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.14 KB | None | 0 0
  1. package com.ghcm.dwh.isdownloader.task;
  2.  
  3. import com.google.common.collect.ImmutableMap;
  4. import org.apache.spark.sql.*;
  5.  
  6. import java.util.Arrays;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. import java.util.Properties;
  10.  
  11. public class TableToORC implements Runnable {
  12. private static final int fetchSize = 3000;
  13.  
  14. private static final Map<String, String> JDBC_OPTS = ImmutableMap.of(
  15. "serverTimezone", "UTC",
  16. "useCursorFetch", "true",
  17. "useSSL", "false",
  18. "autoReconnect", "true",
  19. "zeroDateTimeBehavior", "convertToNull"
  20. );
  21.  
  22. private final TableProperties tableProperties;
  23. private final SparkSession sparkSession;
  24. private final InternalSite[] dataSources;
  25. private final Union unionTask;
  26. private final String unionPath;
  27.  
  28. public TableToORC(TableProperties tableProperties, SparkSession sparkSession, InternalSite[] dataSources, Union unionTask, String unionPath) {
  29. this.tableProperties = tableProperties;
  30. this.sparkSession = sparkSession;
  31. this.dataSources = dataSources;
  32. this.unionTask = unionTask;
  33. this.unionPath = unionPath;
  34. }
  35.  
  36. @Override
  37. public void run() {
  38. Arrays.stream(dataSources)
  39. .parallel()
  40. .map(this::copyTable)
  41. .reduce(unionTask::getUnionedDataSet)
  42. .ifPresent(this::writeData);
  43. }
  44.  
  45. private Dataset<Row> copyTable(InternalSite ds) {
  46. String schema;
  47. if (tableProperties.schema == TableProperties.Schema.INTERNAL) {
  48. schema = ds.internalSchema;
  49. } else {
  50. schema = ds.nobleSchema;
  51. }
  52. String url = "jdbc:" + ds.driverType.name().toLowerCase() + "://" + ds.host;
  53. Map<String, String> propertiesMap = new HashMap<>(JDBC_OPTS);
  54. propertiesMap.put("password", ds.password);
  55. propertiesMap.put("user", ds.username);
  56. propertiesMap.put("driver", ds.driverType.driver);
  57. Properties properties = new Properties();
  58. properties.putAll(propertiesMap);
  59. Dataset<Row> dataset = sparkSession.read().jdbc(url + "/" + schema, tableProperties.name, properties);
  60. return sparkSession.read()
  61. .option("fetchsize", fetchSize)
  62. .jdbc("$url/$schemaName", tableProperties.name, properties)
  63. .withColumn("Dw_Cmp_Id", functions.lit(ds.name));
  64. }
  65.  
  66. private void writeData(Dataset<Row> dataSet) {
  67. dataSet.repartition(1)
  68. .write()
  69. .mode(SaveMode.Overwrite)
  70. .orc(unionPath + "/" + tableProperties.orcName());
  71. }
  72.  
  73. static class InternalSite {
  74. private final String name;
  75. private final DriverType driverType;
  76. private final String host;
  77. private final String internalSchema;
  78. private final String nobleSchema;
  79. private final String username;
  80. private final String password;
  81.  
  82. public InternalSite(String name, DriverType driverType, String host, String internalSchema, String nobleSchema, String username, String password) {
  83. this.name = name;
  84. this.driverType = driverType;
  85. this.host = host;
  86. this.internalSchema = internalSchema;
  87. this.nobleSchema = nobleSchema;
  88. this.username = username;
  89. this.password = password;
  90. }
  91.  
  92. enum DriverType {
  93. MYSQL(com.mysql.cj.jdbc.Driver.class.getName()),
  94. PERVASIVE(com.pervasive.jdbc.v2.Driver.class.getName());
  95.  
  96. private final String driver;
  97.  
  98. DriverType(String driver) {
  99. this.driver = driver;
  100. }
  101. }
  102.  
  103.  
  104. public String toString() {
  105. return name;
  106. }
  107. }
  108.  
  109. static class TableProperties {
  110. private final Schema schema;
  111. private final String name;
  112.  
  113. public TableProperties(Schema schema, String name) {
  114. this.schema = schema;
  115. this.name = name;
  116. }
  117.  
  118. enum Schema {
  119. INTERNAL, NOBLE
  120. }
  121.  
  122. public String orcName() {
  123. return "dwh_" + name;
  124. }
  125.  
  126. public String toString() {
  127. return schema + "." + name;
  128. }
  129. }
  130. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement