Advertisement
detonator413

Avro MultipleOutputs

Aug 4th, 2011
782
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 6.05 KB | None | 0 0
  1. package com.mycompany.AvroMultipleOutputs;
  2.  
  3. import org.apache.avro.Schema;
  4. import org.apache.avro.file.CodecFactory;
  5. import org.apache.avro.file.DataFileWriter;
  6. import org.apache.avro.generic.GenericContainer;
  7. import org.apache.avro.mapred.AvroJob;
  8. import org.apache.avro.mapred.AvroOutputFormat;
  9. import org.apache.avro.mapred.AvroWrapper;
  10. import org.apache.avro.reflect.ReflectDatumWriter;
  11. import org.apache.hadoop.fs.Path;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.mapred.*;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16.  
  17. import java.io.IOException;
  18. import java.net.URLDecoder;
  19. import java.text.NumberFormat;
  20. import java.util.Map;
  21.  
  22. import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
  23. import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
  24.  
  25.  
  26. /**
  27.  * Author: zholudev
  28.  * Date:   8/2/11
  29.  */
  30.  
  31.  
  32. public class AvroMultipleOutputs<T> {
  33.  
  34.     private static final Logger LOGGER = LoggerFactory.getLogger(AvroMultipleOutputs.class);
  35.  
  36.     private static final String MO_AVRO_CLASS = "mo.namedOutput.avroClass.";
  37.  
  38.     private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  39.     public static final String SCHEMA_FIELD_NAME = "SCHEMA$";
  40.  
  41.     static {
  42.       NUMBER_FORMAT.setMinimumIntegerDigits(5);
  43.       NUMBER_FORMAT.setGroupingUsed(false);
  44.     }
  45.  
  46.     private String namedOutputFilePath;
  47.     private Schema schema;
  48.     private final AvroWrapper<T> wrapper = new AvroWrapper<T>(null);
  49.     private RecordWriter<AvroWrapper<T>, NullWritable> writer = null;
  50.     private final JobConf job;
  51.  
  52.     public static void addNamedOutput(JobConf job, String namedOutput, Class<? extends GenericContainer> specificAvroClass) {
  53.         if (namedOutput == null || namedOutput.trim().isEmpty()) {
  54.             throw new IllegalArgumentException("Named output cannot be null or empty");
  55.         }
  56.         job.setClass(MO_AVRO_CLASS + namedOutput, specificAvroClass, GenericContainer.class);
  57.     }
  58.  
  59.     public AvroMultipleOutputs(JobConf job, String namedOutput) {
  60.         this.job = job;
  61.         //dots in the name are treated as file system separators
  62.         namedOutputFilePath = namedOutput.replace(".", "/");
  63.         if (!namedOutputFilePath.endsWith("/")) {
  64.             namedOutputFilePath += ".";
  65.         }
  66.         int partition = job.getInt("mapred.task.partition", 1);
  67.         namedOutputFilePath += "part-" + NUMBER_FORMAT.format(partition) + AvroOutputFormat.EXT;
  68.  
  69.         Class clazz = getNamedOutputAvroClass(job, namedOutput);
  70.         if (clazz == null) {
  71.             throw new RuntimeException("No named output with name " + namedOutput + " has been registered");
  72.         }
  73.  
  74.         try {
  75.             schema = (Schema) clazz.getField(SCHEMA_FIELD_NAME).get(null);
  76.             if (schema == null) {
  77.                 //should never happen
  78.                 throw new RuntimeException("Schema must not be null in class " + clazz.getName());
  79.             }
  80.         } catch (Exception e) {
  81.             //should never happen, but just in case...
  82.             LOGGER.error("Implementation error, cannot extract a static field SCHEMA$ from class " + clazz.getName());
  83.             throw new RuntimeException(e);
  84.         }
  85.     }
  86.  
  87.     private static Class<? extends GenericContainer> getNamedOutputAvroClass (JobConf job, String namesOutput) {
  88.         return job.getClass(MO_AVRO_CLASS + namesOutput, null, GenericContainer.class);
  89.     }
  90.  
  91.     public void collect(T datum, Reporter reporter) throws IOException {
  92.         wrapper.datum(datum);
  93.  
  94.         if (null == writer) {
  95.             writer = getRecordWriter(job);
  96.         }
  97.  
  98.         getCollector(reporter).collect(wrapper, NullWritable.get());
  99.     }
  100.  
  101.     public OutputCollector<AvroWrapper<T>, NullWritable> getCollector(final Reporter reporter) throws IOException {
  102.         return new OutputCollector<AvroWrapper<T>, NullWritable>() {
  103.             @Override
  104.             public void collect(AvroWrapper<T> key, NullWritable value) throws IOException {
  105.                 writer.write(key, value);
  106.                 reporter.progress();
  107.             }
  108.         };
  109.     }
  110.  
  111.     //code is taken and slightly adapted from AvroOutputFormat
  112.     public RecordWriter<AvroWrapper<T>, NullWritable>
  113.         getRecordWriter(JobConf job)
  114.             throws IOException {
  115.  
  116.         final DataFileWriter<T> writer =
  117.                 new DataFileWriter<T>(new ReflectDatumWriter<T>());
  118.  
  119.         if (FileOutputFormat.getCompressOutput(job)) {
  120.             int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
  121.             String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
  122.             CodecFactory factory = codecName.equals(DEFLATE_CODEC)
  123.                     ? CodecFactory.deflateCodec(level)
  124.                     : CodecFactory.fromString(codecName);
  125.             writer.setCodec(factory);
  126.         }
  127.  
  128.         writer.setSyncInterval(job.getInt(AvroOutputFormat.SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
  129.  
  130.         // copy metadata from job
  131.         for (Map.Entry<String, String> e : job) {
  132.             if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
  133.                 writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
  134.                         e.getValue());
  135.             if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
  136.                 writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
  137.                         URLDecoder.decode(e.getValue(), "ISO-8859-1")
  138.                                 .getBytes("ISO-8859-1"));
  139.         }
  140.  
  141.         Path path = FileOutputFormat.getTaskOutputPath(job, namedOutputFilePath);
  142.         writer.create(schema, path.getFileSystem(job).create(path));
  143.  
  144.         return new RecordWriter<AvroWrapper<T>, NullWritable>() {
  145.             public void write(AvroWrapper<T> wrapper, NullWritable ignore)
  146.                     throws IOException {
  147.                 writer.append(wrapper.datum());
  148.             }
  149.  
  150.             public void close(Reporter reporter) throws IOException {
  151.                 writer.close();
  152.             }
  153.         };
  154.     }
  155.  
  156. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement