Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* Copyright (c) 2019 Travelport. All rights reserved. */
- package com.travelport.brm.util;
- import java.io.IOException;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.Properties;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.permission.FsAction;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.storm.hdfs.common.rotation.RotationAction;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class MoveHDFSFileAction implements RotationAction {
- /**
- *
- */
- private static final long serialVersionUID = -6038343142138479406L;
- private static final Logger LOG = LoggerFactory.getLogger(MoveHDFSFileAction.class);
- private Properties configs;
- private String hiveTableName;
- private String databaseName;
- public MoveHDFSFileAction(Properties configs, String databaseName, String hiveTableName) {
- this.configs = configs;
- this.hiveTableName = hiveTableName;
- this.databaseName = databaseName;
- }
- private String constructHourPartitionName(Date date) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(date);
- String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
- if (hour.length() < 2) {
- hour = "0" + hour;
- }
- return hour;
- }
- private String constructDatePartitionName(Date date) {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
- return dateFormat.format(date);
- }
- /**
- * Creates the destination directory, adds the hive partition if not exist and move the file to destination directory.
- *
- */
- @Override
- public void execute(FileSystem fileSystem, Path filePath) throws IOException {
- Configuration conf = new Configuration();
- conf.set("hadoop.security.authentication", "Kerberos");
- UserGroupInformation.setConfiguration(conf);
- try {
- UserGroupInformation.loginUserFromKeytab("brmhive@TVL.COM", "/etc/security/keytabs/brmhive.headless.keytab");
- } catch (IOException e) {
- LOG.error("Error while setting UserGroupInformation: "+e);
- }
- LOG.debug("In MoveHDFSFileAction: FilePath:{}", filePath);
- String stormRootPath = configs.getProperty(CommonConstants.STORM_ROOT_PATH);
- Date date = new Date(System.currentTimeMillis());
- String datePartitionName = constructDatePartitionName(date);
- String hourPartitionName = constructHourPartitionName(date);
- String hdfsDestDir = stormRootPath + databaseName + "/" + hiveTableName + "/ing_dt=" + datePartitionName + "/ing_hr="
- + hourPartitionName;
- Path destPath = new Path(hdfsDestDir, filePath.getName());
- Path destDir = new Path(hdfsDestDir);
- LOG.debug("destinaltion path and destDir {}", destDir);
- if (!fileSystem.exists(destDir)) {
- boolean result = FileSystem.mkdirs(fileSystem, destDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- if (result) {
- LOG.debug("Created directory: {}", destDir);
- } else {
- LOG.debug("Directory not created");
- }
- } else {
- LOG.debug("Directory Already Exists");
- }
- boolean success = fileSystem.rename(filePath, destPath);
- if (success) {
- addHivePartition(hiveTableName, datePartitionName, hourPartitionName, hdfsDestDir);
- LOG.info("File successfully moved to destination from {} to {}", filePath, destPath);
- } else {
- LOG.error("Could not move file from {} to {}.", filePath, destPath);
- }
- return;
- }
- private void addHivePartition(String hiveTableName, String datePartitionName, String hourPartitionName, String hdfsDestDir) {
- String ddl = "ALTER TABLE " + databaseName + "." + hiveTableName + " ADD IF NOT EXISTS PARTITION (ing_dt='"
- + datePartitionName
- + "', ing_hr='" + hourPartitionName + "') location '" + hdfsDestDir + "'";
- try {
- execHiveDDL(ddl);
- } catch (Exception e) {
- LOG.error("Failed ddl {}\n", ddl, e);
- }
- }
- private void execHiveDDL(String ddl) throws ClassNotFoundException, SQLException {
- LOG.debug("Executing ddl = " + ddl);
- Class.forName(configs.getProperty(CommonConstants.DRIVER_NAME));
- Connection con = null;
- Statement stmt = null;
- try {
- con = DriverManager.getConnection(configs.getProperty(CommonConstants.JDBC_URL));
- stmt = con.createStatement();
- stmt.execute(ddl);
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException ex) {
- LOG.error("Error occured while closing the SQL Statement", ex);
- }
- }
- if (con != null) {
- try {
- con.close();
- } catch (SQLException ex) {
- LOG.error("Error occured while closing Hive connection", ex);
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement