SHARE
TWEET

Untitled

a guest Oct 14th, 2019 84 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /* Copyright (c) 2019 Travelport. All rights reserved. */
  2. package com.travelport.brm.util;
  3.  
  4. import java.io.IOException;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. import java.text.SimpleDateFormat;
  10. import java.util.Calendar;
  11. import java.util.Date;
  12. import java.util.Properties;
  13.  
  14. import org.apache.hadoop.conf.Configuration;
  15. import org.apache.hadoop.fs.FileSystem;
  16. import org.apache.hadoop.fs.Path;
  17. import org.apache.hadoop.fs.permission.FsAction;
  18. import org.apache.hadoop.fs.permission.FsPermission;
  19. import org.apache.hadoop.security.UserGroupInformation;
  20. import org.apache.storm.hdfs.common.rotation.RotationAction;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23.  
  24. public class MoveHDFSFileAction implements RotationAction {
  25.     /**
  26.      *
  27.      */
  28.     private static final long serialVersionUID = -6038343142138479406L;
  29.     private static final Logger LOG = LoggerFactory.getLogger(MoveHDFSFileAction.class);
  30.  
  31.     private Properties configs;
  32.     private String hiveTableName;
  33.     private String databaseName;
  34.  
  35.     public MoveHDFSFileAction(Properties configs, String databaseName, String hiveTableName) {
  36.         this.configs = configs;
  37.         this.hiveTableName = hiveTableName;
  38.         this.databaseName = databaseName;
  39.     }
  40.  
  41.     private String constructHourPartitionName(Date date) {
  42.         Calendar calendar = Calendar.getInstance();
  43.         calendar.setTime(date);
  44.         String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
  45.         if (hour.length() < 2) {
  46.             hour = "0" + hour;
  47.         }
  48.         return hour;
  49.     }
  50.  
  51.     private String constructDatePartitionName(Date date) {
  52.         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
  53.         return dateFormat.format(date);
  54.     }
  55.  
  56.     /**
  57.      * Creates the destination directory, adds the hive partition if not exist and move the file to destination directory.
  58.      *
  59.      */
  60.     @Override
  61.     public void execute(FileSystem fileSystem, Path filePath) throws IOException {
  62.  
  63.         Configuration conf = new Configuration();
  64.         conf.set("hadoop.security.authentication", "Kerberos");
  65.         UserGroupInformation.setConfiguration(conf);
  66.         try {
  67.             UserGroupInformation.loginUserFromKeytab("brmhive@TVL.COM", "/etc/security/keytabs/brmhive.headless.keytab");
  68.         } catch (IOException e) {
  69.             LOG.error("Error while setting UserGroupInformation: "+e);
  70.         }
  71.  
  72.         LOG.debug("In MoveHDFSFileAction: FilePath:{}", filePath);
  73.  
  74.         String stormRootPath = configs.getProperty(CommonConstants.STORM_ROOT_PATH);
  75.  
  76.         Date date = new Date(System.currentTimeMillis());
  77.  
  78.         String datePartitionName = constructDatePartitionName(date);
  79.         String hourPartitionName = constructHourPartitionName(date);
  80.  
  81.         String hdfsDestDir = stormRootPath + databaseName + "/" + hiveTableName + "/ing_dt=" + datePartitionName + "/ing_hr="
  82.                 + hourPartitionName;
  83.  
  84.         Path destPath = new Path(hdfsDestDir, filePath.getName());
  85.  
  86.         Path destDir = new Path(hdfsDestDir);
  87.         LOG.debug("destinaltion path and destDir  {}", destDir);
  88.  
  89.         if (!fileSystem.exists(destDir)) {
  90.  
  91.             boolean result = FileSystem.mkdirs(fileSystem, destDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
  92.  
  93.             if (result) {
  94.                 LOG.debug("Created directory: {}", destDir);
  95.             } else {
  96.                 LOG.debug("Directory not created");
  97.             }
  98.         } else {
  99.             LOG.debug("Directory Already Exists");
  100.         }
  101.  
  102.         boolean success = fileSystem.rename(filePath, destPath);
  103.         if (success) {
  104.             addHivePartition(hiveTableName, datePartitionName, hourPartitionName, hdfsDestDir);
  105.             LOG.info("File successfully moved to destination from {} to {}", filePath, destPath);
  106.         } else {
  107.             LOG.error("Could not move file from {} to {}.", filePath, destPath);
  108.         }
  109.         return;
  110.  
  111.     }
  112.  
  113.     private void addHivePartition(String hiveTableName, String datePartitionName, String hourPartitionName, String hdfsDestDir) {
  114.  
  115.         String ddl = "ALTER TABLE " + databaseName + "." + hiveTableName + " ADD IF NOT EXISTS PARTITION (ing_dt='"
  116.                 + datePartitionName
  117.                 + "', ing_hr='" + hourPartitionName + "') location '" + hdfsDestDir + "'";
  118.  
  119.         try {
  120.             execHiveDDL(ddl);
  121.         } catch (Exception e) {
  122.             LOG.error("Failed ddl {}\n", ddl, e);
  123.         }
  124.     }
  125.  
  126.     private void execHiveDDL(String ddl) throws ClassNotFoundException, SQLException {
  127.  
  128.         LOG.debug("Executing ddl = " + ddl);
  129.  
  130.         Class.forName(configs.getProperty(CommonConstants.DRIVER_NAME));
  131.         Connection con = null;
  132.         Statement stmt = null;
  133.         try {
  134.             con = DriverManager.getConnection(configs.getProperty(CommonConstants.JDBC_URL));
  135.             stmt = con.createStatement();
  136.             stmt.execute(ddl);
  137.         } finally {
  138.             if (stmt != null) {
  139.                 try {
  140.                     stmt.close();
  141.                 } catch (SQLException ex) {
  142.                     LOG.error("Error occured while closing the SQL Statement", ex);
  143.                 }
  144.             }
  145.             if (con != null) {
  146.                 try {
  147.                     con.close();
  148.                 } catch (SQLException ex) {
  149.                     LOG.error("Error occured while closing Hive connection", ex);
  150.                 }
  151.             }
  152.         }
  153.  
  154.     }
  155. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top