Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.BufferedInputStream;
- import java.io.FileInputStream;
- import java.io.InputStream;
- import java.net.URI;
- import java.net.URL;
- import java.time.LocalDateTime;
- import javax.security.auth.Subject;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.security.UserGroupInformation;
- import java.security.PrivilegedExceptionAction;
- public class ReadWriteHDFSWithKinit {
- public static void main(String[] args) {
- try {
- final String confDir = "/tmp/conf";
- final String user = args[0];
- if (args.length == 2 && (args[1].equals("impersonate"))) {
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws Exception {
- read(args, confDir, null);
- return null;}
- });
- } else {
- read(args, confDir, user);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void read(String[] args, String confDir, String user) throws Exception {
- Configuration conf = new Configuration();
- conf.addResource(new URL("file://" + confDir + "/core-site.xml"));
- conf.addResource(new URL("file://" + confDir + "/hdfs-site.xml"));
- conf.addResource(new URL("file://" + confDir + "/hive-site.xml"));
- //conf.addResource(new URL("file://" + confDir + "/kms-site.xml"));
- conf.addResource(new URL("file://" + confDir + "/yarn-site.xml"));
- //Informatica is relying on the ticket cache holding the ticket instead of using the keytab
- conf.set("hadoop.security.kerberos.ticket.cache.path", "/tmp/krb5cc_foo");
- String ticketCachePath =
- conf.get("hadoop.security.kerberos.ticket.cache.path");
- System.out.println("\n\n\nTicket Cache: " + ticketCachePath);
- System.out.println("\n\n\n");
- final String srcFile = "/tmp/ff1.txt";
- final String tgtFile = "/tmp/ff1_out.txt";
- System.out.println("Opening target at : " + LocalDateTime.now());
- String uri = "hdfs://host1.abc.com:8020";
- FileSystem hdfssrc;
- FileSystem hdfstgt;
- if (user != null) {
- System.out.println("Using FileSystem API .get with user: " + user);
- hdfssrc = FileSystem.get(new URI(uri), conf, user);
- hdfstgt = FileSystem.get(new URI(uri), conf, user);
- } else {
- System.out.println("Using FileSystem API .get without user specified");
- hdfssrc = FileSystem.get(new URI(uri), conf);
- hdfstgt = FileSystem.get(new URI(uri), conf);
- }
- FSDataInputStream inputStream = hdfssrc.open(new Path(srcFile));
- FSDataOutputStream outputStream = hdfstgt.create(new Path(tgtFile));
- System.out.println("Opened target at " + LocalDateTime.now());
- int mainCount = 0;
- int count = 0;
- int mainTimeOut = 5;
- int sleepTimeOut = 30;
- try {
- while (mainCount < mainTimeOut) { //roughly 2.5 minutes
- // 6. Open a Input Stream to read the data from HDFS
- //inputStream = hdfs.open(path);
- //inputStream = new BufferedInputStream(new FileInputStream(srcFile));
- //========================= sleep for 10 secs
- System.out.println(LocalDateTime.now() + "Sleeping for " + sleepTimeOut + " sec");
- count = 0;
- while (count < sleepTimeOut) {
- count++;
- Thread.sleep(1000);
- System.out.print((mainCount*10 + count) + " sec, ");
- }
- System.out.println(LocalDateTime.now() + "Awake now");
- //=========================== copy bytes
- byte [] buffer = new byte[20];
- System.out.println("Copying " + buffer + " at : " + LocalDateTime.now());
- inputStream.read(buffer);
- outputStream.write(buffer);
- //outputStream.write(("Write-" + (mainCount + "")).getBytes());
- System.out.println("Copied at : " + LocalDateTime.now());
- //close the input stream
- //inputStream.close();
- mainCount++;
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- System.out.println("Exception: " + e.getMessage());
- //e.printStackTrace();
- } finally {
- // 8. Close the InputStream once the data is read
- //IOUtils.closeStream(inputStream);
- inputStream.close();
- outputStream.close();
- outputStream.flush();
- hdfssrc.close();
- hdfstgt.close();
- }
- }
- }
Add Comment
Please, Sign In to add comment