Guest User

Untitled

a guest
Sep 14th, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.42 KB | None | 0 0
  1. import java.io.BufferedInputStream;
  2. import java.io.FileInputStream;
  3. import java.io.InputStream;
  4. import java.net.URI;
  5. import java.net.URL;
  6. import java.time.LocalDateTime;
  7.  
  8. import javax.security.auth.Subject;
  9.  
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.fs.FSDataInputStream;
  12. import org.apache.hadoop.fs.FSDataOutputStream;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IOUtils;
  16. import org.apache.hadoop.security.UserGroupInformation;
  17. import java.security.PrivilegedExceptionAction;
  18.  
  19. public class ReadWriteHDFSWithKinit {
  20.  
  21. public static void main(String[] args) {
  22. try {
  23. final String confDir = "/tmp/conf";
  24. final String user = args[0];
  25. if (args.length == 2 && (args[1].equals("impersonate"))) {
  26. UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
  27. ugi.doAs(new PrivilegedExceptionAction<Void>() {
  28. public Void run() throws Exception {
  29. read(args, confDir, null);
  30. return null;}
  31. });
  32. } else {
  33. read(args, confDir, user);
  34. }
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. }
  39.  
  40. public static void read(String[] args, String confDir, String user) throws Exception {
  41.  
  42. Configuration conf = new Configuration();
  43. conf.addResource(new URL("file://" + confDir + "/core-site.xml"));
  44. conf.addResource(new URL("file://" + confDir + "/hdfs-site.xml"));
  45. conf.addResource(new URL("file://" + confDir + "/hive-site.xml"));
  46. //conf.addResource(new URL("file://" + confDir + "/kms-site.xml"));
  47. conf.addResource(new URL("file://" + confDir + "/yarn-site.xml"));
  48.  
  49. //Informatica is relying on the ticket cache holding the ticket instead of using the keytab
  50. conf.set("hadoop.security.kerberos.ticket.cache.path", "/tmp/krb5cc_foo");
  51. String ticketCachePath =
  52. conf.get("hadoop.security.kerberos.ticket.cache.path");
  53. System.out.println("\n\n\nTicket Cache: " + ticketCachePath);
  54. System.out.println("\n\n\n");
  55.  
  56. final String srcFile = "/tmp/ff1.txt";
  57. final String tgtFile = "/tmp/ff1_out.txt";
  58.  
  59. System.out.println("Opening target at : " + LocalDateTime.now());
  60. String uri = "hdfs://host1.abc.com:8020";
  61.  
  62. FileSystem hdfssrc;
  63. FileSystem hdfstgt;
  64. if (user != null) {
  65. System.out.println("Using FileSystem API .get with user: " + user);
  66. hdfssrc = FileSystem.get(new URI(uri), conf, user);
  67. hdfstgt = FileSystem.get(new URI(uri), conf, user);
  68. } else {
  69. System.out.println("Using FileSystem API .get without user specified");
  70. hdfssrc = FileSystem.get(new URI(uri), conf);
  71. hdfstgt = FileSystem.get(new URI(uri), conf);
  72. }
  73.  
  74. FSDataInputStream inputStream = hdfssrc.open(new Path(srcFile));
  75. FSDataOutputStream outputStream = hdfstgt.create(new Path(tgtFile));
  76. System.out.println("Opened target at " + LocalDateTime.now());
  77.  
  78. int mainCount = 0;
  79. int count = 0;
  80.  
  81. int mainTimeOut = 5;
  82. int sleepTimeOut = 30;
  83. try {
  84. while (mainCount < mainTimeOut) { //roughly 2.5 minutes
  85.  
  86. // 6. Open a Input Stream to read the data from HDFS
  87. //inputStream = hdfs.open(path);
  88. //inputStream = new BufferedInputStream(new FileInputStream(srcFile));
  89.  
  90. //========================= sleep for 10 secs
  91. System.out.println(LocalDateTime.now() + "Sleeping for " + sleepTimeOut + " sec");
  92. count = 0;
  93. while (count < sleepTimeOut) {
  94. count++;
  95. Thread.sleep(1000);
  96. System.out.print((mainCount*10 + count) + " sec, ");
  97. }
  98. System.out.println(LocalDateTime.now() + "Awake now");
  99.  
  100. //=========================== copy bytes
  101.  
  102. byte [] buffer = new byte[20];
  103. System.out.println("Copying " + buffer + " at : " + LocalDateTime.now());
  104. inputStream.read(buffer);
  105. outputStream.write(buffer);
  106.  
  107. //outputStream.write(("Write-" + (mainCount + "")).getBytes());
  108. System.out.println("Copied at : " + LocalDateTime.now());
  109.  
  110. //close the input stream
  111. //inputStream.close();
  112. mainCount++;
  113.  
  114. }
  115. } catch (Exception e) {
  116. // TODO Auto-generated catch block
  117. System.out.println("Exception: " + e.getMessage());
  118. //e.printStackTrace();
  119. } finally {
  120. // 8. Close the InputStream once the data is read
  121. //IOUtils.closeStream(inputStream);
  122.  
  123. inputStream.close();
  124. outputStream.close();
  125. outputStream.flush();
  126. hdfssrc.close();
  127. hdfstgt.close();
  128. }
  129. }
  130. }
Add Comment
Please, Sign In to add comment