Advertisement
ChrisWhite199

Stack Overflow Q10140171

Apr 13th, 2012
332
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.56 KB | None | 0 0
  1. ////////////////////////////////////////////////////////////////////////////////
  2. // Extension of TextOutputFormat that allows for value concatenation
  3. // See http://stackoverflow.com/questions/10140171/handling-large-output-values-from-reduce-step-in-hadoop
  4. ////////////////////////////////////////////////////////////////////////////////
  5.  
  6. package sandbox;
  7.  
  8. import java.io.DataOutputStream;
  9. import java.io.IOException;
  10. import java.io.UnsupportedEncodingException;
  11.  
  12. import org.apache.hadoop.fs.FSDataOutputStream;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.NullWritable;
  16. import org.apache.hadoop.io.Text;
  17. import org.apache.hadoop.io.compress.CompressionCodec;
  18. import org.apache.hadoop.io.compress.GzipCodec;
  19. import org.apache.hadoop.mapred.FileOutputFormat;
  20. import org.apache.hadoop.mapred.JobConf;
  21. import org.apache.hadoop.mapred.RecordWriter;
  22. import org.apache.hadoop.mapred.Reporter;
  23. import org.apache.hadoop.mapred.TextOutputFormat;
  24. import org.apache.hadoop.util.Progressable;
  25. import org.apache.hadoop.util.ReflectionUtils;
  26.  
  27. public class StreamingTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
  28.     protected static class StreamingLineRecordWriter<K, V> implements
  29.             RecordWriter<K, V> {
  30.         private static final String utf8 = "UTF-8";
  31.         private static final byte[] newline;
  32.  
  33.         static {
  34.             try {
  35.                 newline = "\n".getBytes(utf8);
  36.             } catch (UnsupportedEncodingException uee) {
  37.                 throw new IllegalArgumentException("can't find " + utf8
  38.                         + " encoding");
  39.             }
  40.         }
  41.  
  42.         protected DataOutputStream out;
  43.         private final byte[] keyValueSeparator;
  44.         private final byte[] valueDelimiter;
  45.         private boolean dataWritten = false;
  46.  
  47.         public StreamingLineRecordWriter(DataOutputStream out,
  48.                 String keyValueSeparator, String valueDelimiter) {
  49.             this.out = out;
  50.             try {
  51.                 this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  52.                 this.valueDelimiter = valueDelimiter.getBytes(utf8);
  53.             } catch (UnsupportedEncodingException uee) {
  54.                 throw new IllegalArgumentException("can't find " + utf8
  55.                         + " encoding");
  56.             }
  57.         }
  58.  
  59.         public StreamingLineRecordWriter(DataOutputStream out) {
  60.             this(out, "\t", ",");
  61.         }
  62.  
  63.         /**
  64.          * Write the object to the byte stream, handling Text as a special case.
  65.          *
  66.          * @param o
  67.          *            the object to print
  68.          * @throws IOException
  69.          *             if the write throws, we pass it on
  70.          */
  71.         private void writeObject(Object o) throws IOException {
  72.             if (o instanceof Text) {
  73.                 Text to = (Text) o;
  74.                 out.write(to.getBytes(), 0, to.getLength());
  75.             } else {
  76.                 out.write(o.toString().getBytes(utf8));
  77.             }
  78.         }
  79.  
  80.         public synchronized void write(K key, V value) throws IOException {
  81.  
  82.             boolean nullKey = key == null || key instanceof NullWritable;
  83.             boolean nullValue = value == null || value instanceof NullWritable;
  84.             if (nullKey && nullValue) {
  85.                 return;
  86.             }
  87.  
  88.             if (!nullKey) {
  89.                 // if we've written data before, append a new line
  90.                 if (dataWritten) {
  91.                     out.write(newline);
  92.                 }
  93.  
  94.                 // write out the key and separator
  95.                 writeObject(key);
  96.                 out.write(keyValueSeparator);
  97.             } else if (!nullValue) {
  98.                 // write out the value delimiter
  99.                 out.write(valueDelimiter);
  100.             }
  101.  
  102.             // write out the value
  103.             writeObject(value);
  104.  
  105.             // track that we've written some data
  106.             dataWritten = true;
  107.         }
  108.  
  109.         public synchronized void close(Reporter reporter) throws IOException {
  110.             // if we've written out any data, append a closing newline
  111.             if (dataWritten) {
  112.                 out.write(newline);
  113.             }
  114.  
  115.             out.close();
  116.         }
  117.     }
  118.  
  119.     @Override
  120.     public RecordWriter<K, V> getRecordWriter(FileSystem fileSystem,
  121.             JobConf job, String name, Progressable progress) throws IOException {
  122.         boolean isCompressed = getCompressOutput(job);
  123.         String keyValueSeparator = job.get("mapred.textoutputformat.separator",
  124.                 "\t");
  125.         String valueDelimiter = job.get("mapred.textoutputformat.delimiter",
  126.                 ",");
  127.         if (!isCompressed) {
  128.             Path file = FileOutputFormat.getTaskOutputPath(job, name);
  129.             FileSystem fs = file.getFileSystem(job);
  130.             FSDataOutputStream fileOut = fs.create(file, progress);
  131.             return new StreamingLineRecordWriter<K, V>(fileOut,
  132.                     keyValueSeparator, valueDelimiter);
  133.         } else {
  134.             Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
  135.                     job, GzipCodec.class);
  136.             // create the named codec
  137.             CompressionCodec codec = ReflectionUtils.newInstance(codecClass,
  138.                     job);
  139.             // build the filename including the extension
  140.             Path file = FileOutputFormat.getTaskOutputPath(job,
  141.                     name + codec.getDefaultExtension());
  142.             FileSystem fs = file.getFileSystem(job);
  143.             FSDataOutputStream fileOut = fs.create(file, progress);
  144.             return new StreamingLineRecordWriter<K, V>(new DataOutputStream(
  145.                     codec.createOutputStream(fileOut)), keyValueSeparator,
  146.                     valueDelimiter);
  147.         }
  148.     }
  149. }
  150.  
  151.  
  152. ////////////////////////////////////////////////////////////////
  153. // JUnit Test (well doesn't assert anything but you can manually
  154. // assert file contents
  155. ////////////////////////////////////////////////////////////////
  156.  
  157. import java.io.IOException;
  158.  
  159. import org.apache.hadoop.conf.Configuration;
  160. import org.apache.hadoop.fs.FileSystem;
  161. import org.apache.hadoop.fs.Path;
  162. import org.apache.hadoop.io.Text;
  163. import org.apache.hadoop.mapred.FileOutputCommitter;
  164. import org.apache.hadoop.mapred.JobConf;
  165. import org.apache.hadoop.mapred.RecordWriter;
  166. import org.apache.hadoop.mapred.TaskAttemptID;
  167. import org.junit.Test;
  168.  
  169. import sandbox.StreamingTextOutputFormat;
  170.  
  171. public class StreamingTextOutputFormatTest {
  172.  
  173.     @Test
  174.     public void testOutputFormat() throws IOException {
  175.  
  176.         JobConf job = new JobConf(new Configuration());
  177.         StreamingTextOutputFormat<Text, Text> outFormat = new StreamingTextOutputFormat<Text, Text>();
  178.         StreamingTextOutputFormat.setOutputPath(job, new Path("target/test"));
  179.         Path jobTmpDir = new Path(StreamingTextOutputFormat.getOutputPath(job), FileOutputCommitter.TEMP_DIR_NAME);
  180.         FileSystem fs = jobTmpDir.getFileSystem(job);
  181.         fs.mkdirs(jobTmpDir);
  182.        
  183.         job.set("mapred.task.id",
  184.                 new TaskAttemptID("jt", 1, true, 1, 1).toString());
  185.  
  186.         RecordWriter<Text, Text> writer = outFormat.getRecordWriter(
  187.                 FileSystem.getLocal(job), job, "test", null);
  188.  
  189.         writer.write(new Text("key1"), new Text("value1"));
  190.  
  191.         writer.write(new Text("key2"), new Text("value1"));
  192.         writer.write(null, new Text("value2"));
  193.         writer.write(null, new Text("value3"));
  194.  
  195.         writer.write(new Text("key3"), new Text("value1"));
  196.         writer.write(null, new Text("value2"));
  197.  
  198.         writer.close(null);
  199.     }
  200. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement