Advertisement
Guest User

Untitled

a guest
Oct 14th, 2019
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.56 KB | None | 0 0
  1. /* Copyright (c) 2019 Travelport. All rights reserved. */
  2. package com.travelport.brm.util;
  3.  
  4. import java.io.IOException;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. import java.text.SimpleDateFormat;
  10. import java.util.Calendar;
  11. import java.util.Date;
  12. import java.util.Properties;
  13.  
  14. import org.apache.hadoop.conf.Configuration;
  15. import org.apache.hadoop.fs.FileSystem;
  16. import org.apache.hadoop.fs.Path;
  17. import org.apache.hadoop.fs.permission.FsAction;
  18. import org.apache.hadoop.fs.permission.FsPermission;
  19. import org.apache.hadoop.security.UserGroupInformation;
  20. import org.apache.storm.hdfs.common.rotation.RotationAction;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23.  
  24. public class MoveHDFSFileAction implements RotationAction {
  25. /**
  26. *
  27. */
  28. private static final long serialVersionUID = -6038343142138479406L;
  29. private static final Logger LOG = LoggerFactory.getLogger(MoveHDFSFileAction.class);
  30.  
  31. private Properties configs;
  32. private String hiveTableName;
  33. private String databaseName;
  34.  
  35. public MoveHDFSFileAction(Properties configs, String databaseName, String hiveTableName) {
  36. this.configs = configs;
  37. this.hiveTableName = hiveTableName;
  38. this.databaseName = databaseName;
  39. }
  40.  
  41. private String constructHourPartitionName(Date date) {
  42. Calendar calendar = Calendar.getInstance();
  43. calendar.setTime(date);
  44. String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY));
  45. if (hour.length() < 2) {
  46. hour = "0" + hour;
  47. }
  48. return hour;
  49. }
  50.  
  51. private String constructDatePartitionName(Date date) {
  52. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
  53. return dateFormat.format(date);
  54. }
  55.  
  56. /**
  57. * Creates the destination directory, adds the hive partition if not exist and move the file to destination directory.
  58. *
  59. */
  60. @Override
  61. public void execute(FileSystem fileSystem, Path filePath) throws IOException {
  62.  
  63. Configuration conf = new Configuration();
  64. conf.set("hadoop.security.authentication", "Kerberos");
  65. UserGroupInformation.setConfiguration(conf);
  66. try {
  67. UserGroupInformation.loginUserFromKeytab("brmhive@TVL.COM", "/etc/security/keytabs/brmhive.headless.keytab");
  68. } catch (IOException e) {
  69. LOG.error("Error while setting UserGroupInformation: "+e);
  70. }
  71.  
  72. LOG.debug("In MoveHDFSFileAction: FilePath:{}", filePath);
  73.  
  74. String stormRootPath = configs.getProperty(CommonConstants.STORM_ROOT_PATH);
  75.  
  76. Date date = new Date(System.currentTimeMillis());
  77.  
  78. String datePartitionName = constructDatePartitionName(date);
  79. String hourPartitionName = constructHourPartitionName(date);
  80.  
  81. String hdfsDestDir = stormRootPath + databaseName + "/" + hiveTableName + "/ing_dt=" + datePartitionName + "/ing_hr="
  82. + hourPartitionName;
  83.  
  84. Path destPath = new Path(hdfsDestDir, filePath.getName());
  85.  
  86. Path destDir = new Path(hdfsDestDir);
  87. LOG.debug("destinaltion path and destDir {}", destDir);
  88.  
  89. if (!fileSystem.exists(destDir)) {
  90.  
  91. boolean result = FileSystem.mkdirs(fileSystem, destDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
  92.  
  93. if (result) {
  94. LOG.debug("Created directory: {}", destDir);
  95. } else {
  96. LOG.debug("Directory not created");
  97. }
  98. } else {
  99. LOG.debug("Directory Already Exists");
  100. }
  101.  
  102. boolean success = fileSystem.rename(filePath, destPath);
  103. if (success) {
  104. addHivePartition(hiveTableName, datePartitionName, hourPartitionName, hdfsDestDir);
  105. LOG.info("File successfully moved to destination from {} to {}", filePath, destPath);
  106. } else {
  107. LOG.error("Could not move file from {} to {}.", filePath, destPath);
  108. }
  109. return;
  110.  
  111. }
  112.  
  113. private void addHivePartition(String hiveTableName, String datePartitionName, String hourPartitionName, String hdfsDestDir) {
  114.  
  115. String ddl = "ALTER TABLE " + databaseName + "." + hiveTableName + " ADD IF NOT EXISTS PARTITION (ing_dt='"
  116. + datePartitionName
  117. + "', ing_hr='" + hourPartitionName + "') location '" + hdfsDestDir + "'";
  118.  
  119. try {
  120. execHiveDDL(ddl);
  121. } catch (Exception e) {
  122. LOG.error("Failed ddl {}\n", ddl, e);
  123. }
  124. }
  125.  
  126. private void execHiveDDL(String ddl) throws ClassNotFoundException, SQLException {
  127.  
  128. LOG.debug("Executing ddl = " + ddl);
  129.  
  130. Class.forName(configs.getProperty(CommonConstants.DRIVER_NAME));
  131. Connection con = null;
  132. Statement stmt = null;
  133. try {
  134. con = DriverManager.getConnection(configs.getProperty(CommonConstants.JDBC_URL));
  135. stmt = con.createStatement();
  136. stmt.execute(ddl);
  137. } finally {
  138. if (stmt != null) {
  139. try {
  140. stmt.close();
  141. } catch (SQLException ex) {
  142. LOG.error("Error occured while closing the SQL Statement", ex);
  143. }
  144. }
  145. if (con != null) {
  146. try {
  147. con.close();
  148. } catch (SQLException ex) {
  149. LOG.error("Error occured while closing Hive connection", ex);
  150. }
  151. }
  152. }
  153.  
  154. }
  155. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement