SHARE
TWEET

BinaryFilesToHadoopSequenceFile

eldadlevy Apr 14th, 2011 3,855 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.eldadlevy.hadoop;
  2.  
  3.  
  4. import java.io.ByteArrayOutputStream;
  5. import java.io.IOException;
  6. import java.net.URI;
  7.  
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FSDataInputStream;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import org.apache.hadoop.fs.Path;
  12. import org.apache.hadoop.io.BytesWritable;
  13. import org.apache.hadoop.io.IOUtils;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  21. import org.apache.hadoop.util.GenericOptionsParser;
  22. import org.apache.log4j.Logger;
  23.  
  24.  
  25. public class BinaryFilesToHadoopSequenceFile {
  26.  
  27.         private static Logger logger = Logger.getLogger(BinaryFilesToHadoopSequenceFile.class);
  28.  
  29.         public static class BinaryFilesToHadoopSequenceFileMapper extends Mapper<Object, Text, Text, BytesWritable> {
  30.  
  31.                 public void map(Object key, Text value, Context context)
  32.                 throws IOException, InterruptedException {
  33.  
  34.                         logger.info("map method called..");
  35.  
  36.                         String uri = value.toString();
  37.                         Configuration conf = new Configuration();
  38.                         FileSystem fs = FileSystem.get(URI.create(uri), conf);
  39.                         FSDataInputStream in = null;
  40.                         try {
  41.                                 in = fs.open(new Path(uri));
  42.                                 java.io.ByteArrayOutputStream bout = new ByteArrayOutputStream();
  43.                                 byte buffer[] = new byte[1024 * 1024];
  44.  
  45.                                 while( in.read(buffer, 0, buffer.length) >= 0 ) {
  46.                                         bout.write(buffer);
  47.                                 }
  48.                                 context.write(value, new BytesWritable(bout.toByteArray()));
  49.                         } finally {
  50.                                 IOUtils.closeStream(in);
  51.                         }
  52.                 }
  53.  
  54.         }
  55.  
  56.  
  57.         public static void main(String[] args) throws Exception {
  58.                 Configuration conf = new Configuration();
  59.                 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  60.                 if (otherArgs.length != 2) {
  61.                         System.err.println("Usage: BinaryFilesToHadoopSequenceFile <in Path for url file> <out pat for sequence file>");
  62.                         System.exit(2);
  63.                 }
  64.                
  65.  
  66.                 Job job = new Job(conf, "BinaryFilesToHadoopSequenceFile");
  67.                 job.setJarByClass(BinaryFilesToHadoopSequenceFile.class);
  68.                 job.setMapperClass(BinaryFilesToHadoopSequenceFileMapper.class);
  69.                 job.setOutputKeyClass(Text.class);
  70.                 job.setOutputValueClass(BytesWritable.class);
  71.                 job.setInputFormatClass(TextInputFormat.class);
  72.                 job.setOutputFormatClass(SequenceFileOutputFormat.class);
  73.                
  74.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  75.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  76.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  77.         }
  78.  
  79.  
  80. }
RAW Paste Data
Top