Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.ghcm.dwh.isdownloader.task;
- import com.google.common.collect.ImmutableMap;
- import org.apache.spark.sql.*;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- public class TableToORC implements Runnable {
- private static final int fetchSize = 3000;
- private static final Map<String, String> JDBC_OPTS = ImmutableMap.of(
- "serverTimezone", "UTC",
- "useCursorFetch", "true",
- "useSSL", "false",
- "autoReconnect", "true",
- "zeroDateTimeBehavior", "convertToNull"
- );
- private final TableProperties tableProperties;
- private final SparkSession sparkSession;
- private final InternalSite[] dataSources;
- private final Union unionTask;
- private final String unionPath;
- public TableToORC(TableProperties tableProperties, SparkSession sparkSession, InternalSite[] dataSources, Union unionTask, String unionPath) {
- this.tableProperties = tableProperties;
- this.sparkSession = sparkSession;
- this.dataSources = dataSources;
- this.unionTask = unionTask;
- this.unionPath = unionPath;
- }
- @Override
- public void run() {
- Arrays.stream(dataSources)
- .parallel()
- .map(this::copyTable)
- .reduce(unionTask::getUnionedDataSet)
- .ifPresent(this::writeData);
- }
- private Dataset<Row> copyTable(InternalSite ds) {
- String schema;
- if (tableProperties.schema == TableProperties.Schema.INTERNAL) {
- schema = ds.internalSchema;
- } else {
- schema = ds.nobleSchema;
- }
- String url = "jdbc:" + ds.driverType.name().toLowerCase() + "://" + ds.host;
- Map<String, String> propertiesMap = new HashMap<>(JDBC_OPTS);
- propertiesMap.put("password", ds.password);
- propertiesMap.put("user", ds.username);
- propertiesMap.put("driver", ds.driverType.driver);
- Properties properties = new Properties();
- properties.putAll(propertiesMap);
- Dataset<Row> dataset = sparkSession.read().jdbc(url + "/" + schema, tableProperties.name, properties);
- return sparkSession.read()
- .option("fetchsize", fetchSize)
- .jdbc("$url/$schemaName", tableProperties.name, properties)
- .withColumn("Dw_Cmp_Id", functions.lit(ds.name));
- }
- private void writeData(Dataset<Row> dataSet) {
- dataSet.repartition(1)
- .write()
- .mode(SaveMode.Overwrite)
- .orc(unionPath + "/" + tableProperties.orcName());
- }
- static class InternalSite {
- private final String name;
- private final DriverType driverType;
- private final String host;
- private final String internalSchema;
- private final String nobleSchema;
- private final String username;
- private final String password;
- public InternalSite(String name, DriverType driverType, String host, String internalSchema, String nobleSchema, String username, String password) {
- this.name = name;
- this.driverType = driverType;
- this.host = host;
- this.internalSchema = internalSchema;
- this.nobleSchema = nobleSchema;
- this.username = username;
- this.password = password;
- }
- enum DriverType {
- MYSQL(com.mysql.cj.jdbc.Driver.class.getName()),
- PERVASIVE(com.pervasive.jdbc.v2.Driver.class.getName());
- private final String driver;
- DriverType(String driver) {
- this.driver = driver;
- }
- }
- public String toString() {
- return name;
- }
- }
- static class TableProperties {
- private final Schema schema;
- private final String name;
- public TableProperties(Schema schema, String name) {
- this.schema = schema;
- this.name = name;
- }
- enum Schema {
- INTERNAL, NOBLE
- }
- public String orcName() {
- return "dwh_" + name;
- }
- public String toString() {
- return schema + "." + name;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement