Advertisement
Guest User

CopyDirToSequenceFile

a guest
Oct 12th, 2011
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.90 KB | None | 0 0
  1. import java.io.ByteArrayOutputStream;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4.  
  5. import org.apache.commons.logging.Log;
  6. import org.apache.commons.logging.LogFactory;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.BytesWritable;
  11. import org.apache.hadoop.io.SequenceFile;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.util.ToolRunner;
  14.  
  15. public class CopyDirToSequenceFile extends InputOutputDriver {
  16.   public static final Log LOG = LogFactory.getLog(CopyDirToSequenceFile.class);
  17.   public static void main(String[] args) throws Exception {
  18.     int rc = ToolRunner.run(new CopyDirToSequenceFile(), args);
  19.     System.exit(rc);
  20.  
  21.   }
  22.  
  23.   @Override
  24.   public int process(String[] args) throws Exception {
  25.     Configuration conf = getConf();
  26.     FileSystem fs = FileSystem.get(conf);
  27.     SequenceFile.Writer writer = null;
  28.     try {
  29.       Text key = null;
  30.       BytesWritable value = null;
  31.       File inputDir = new File(input);
  32.       File[] inputFiles = inputDir.listFiles();
  33.       if(inputFiles != null && inputFiles.length > 0) {
  34.         writer = SequenceFile.createWriter(fs, conf,
  35.             new Path(output), Text.class, BytesWritable.class);
  36.         ByteArrayOutputStream out = new ByteArrayOutputStream(4096);
  37.         for (int i = 0; i < inputFiles.length; i++) {
  38.           out.reset();
  39.           FileInputStream in = new FileInputStream(inputFiles[i]);
  40.           try {
  41.             IOUtils.copyBytes(in, out, conf, false);
  42.           } finally {
  43.             IOUtils.cleanup(LOG, in, out);
  44.           }
  45.           String fileName = inputFiles[i].getName();
  46.           key = new Text(fileName);
  47.           value = new BytesWritable(out.toByteArray());
  48.           writer.append(key, value);
  49.         }
  50.       }
  51.     } finally {
  52.       IOUtils.cleanup(LOG, writer);
  53.     }
  54.     return 0;
  55.   }
  56. }
  57.  
  58.  
  59.  
  60.  
  61.  
  62. import java.util.ArrayList;
  63. import java.util.List;
  64.  
  65. import org.apache.hadoop.conf.Configuration;
  66. import org.apache.hadoop.conf.Configured;
  67. import org.apache.hadoop.fs.FileSystem;
  68. import org.apache.hadoop.fs.Path;
  69. import org.apache.hadoop.util.Tool;
  70.  
  71. public abstract class InputOutputDriver extends Configured implements Tool {
  72.   protected String input;
  73.   protected String output;
  74.   protected abstract int process(String[] args) throws Exception;
  75.   public int run(String[] args) throws Exception {
  76.     List<String> remaining = new ArrayList<String>();
  77.     boolean deleteOutput = false;
  78.     for (int i = 0; i < args.length; i++) {
  79.       boolean last = i + 1 == args.length;
  80.       String compareArg = args[i].trim().toLowerCase();
  81.       if(compareArg.equals("-input")) {
  82.         if(last) {
  83.           throw new IllegalArgumentException("Argument " + args[i] + " requires value");
  84.         }
  85.         input = args[++i];
  86.       } else if (compareArg.equals("-output")) {
  87.         if(last) {
  88.           throw new IllegalArgumentException("Argument " + args[i] + " requires value");
  89.         }
  90.         output = args[++i];
  91.       } else if (compareArg.equals("-delete")) {
  92.         deleteOutput = true;
  93.       } else {
  94.         remaining.add(args[i]);
  95.       }
  96.     }
  97.     if(input == null) {
  98.       throw new IllegalArgumentException("-input required");
  99.     }
  100.     if(output == null) {
  101.       throw new IllegalArgumentException("-output required");
  102.     }
  103.     if(deleteOutput) {
  104.       Configuration conf = getConf();
  105.       FileSystem fs = FileSystem.get(conf);
  106.       Path outputPath = new Path(output);
  107.       if(fs.exists(outputPath)) {
  108.         fs.delete(outputPath, true);
  109.       }
  110.     }
  111.     return process(remaining.toArray(new String[]{}));
  112.   }
  113.   public String getInput() {
  114.     return input;
  115.   }
  116.   public void setInput(String input) {
  117.     this.input = input;
  118.   }
  119.   public String getOutput() {
  120.     return output;
  121.   }
  122.   public void setOutput(String output) {
  123.     this.output = output;
  124.   }
  125. }
  126.  
  127.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement