Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.mycompany.AvroMultipleOutputs;
- import org.apache.avro.Schema;
- import org.apache.avro.file.CodecFactory;
- import org.apache.avro.file.DataFileWriter;
- import org.apache.avro.generic.GenericContainer;
- import org.apache.avro.mapred.AvroJob;
- import org.apache.avro.mapred.AvroOutputFormat;
- import org.apache.avro.mapred.AvroWrapper;
- import org.apache.avro.reflect.ReflectDatumWriter;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapred.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.net.URLDecoder;
- import java.text.NumberFormat;
- import java.util.Map;
- import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
- import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
- /**
- * Author: zholudev
- * Date: 8/2/11
- */
- public class AvroMultipleOutputs<T> {
- private static final Logger LOGGER = LoggerFactory.getLogger(AvroMultipleOutputs.class);
- private static final String MO_AVRO_CLASS = "mo.namedOutput.avroClass.";
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- public static final String SCHEMA_FIELD_NAME = "SCHEMA$";
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
- private String namedOutputFilePath;
- private Schema schema;
- private final AvroWrapper<T> wrapper = new AvroWrapper<T>(null);
- private RecordWriter<AvroWrapper<T>, NullWritable> writer = null;
- private final JobConf job;
- public static void addNamedOutput(JobConf job, String namedOutput, Class<? extends GenericContainer> specificAvroClass) {
- if (namedOutput == null || namedOutput.trim().isEmpty()) {
- throw new IllegalArgumentException("Named output cannot be null or empty");
- }
- job.setClass(MO_AVRO_CLASS + namedOutput, specificAvroClass, GenericContainer.class);
- }
- public AvroMultipleOutputs(JobConf job, String namedOutput) {
- this.job = job;
- //dots in the name are treated as file system separators
- namedOutputFilePath = namedOutput.replace(".", "/");
- if (!namedOutputFilePath.endsWith("/")) {
- namedOutputFilePath += ".";
- }
- int partition = job.getInt("mapred.task.partition", 1);
- namedOutputFilePath += "part-" + NUMBER_FORMAT.format(partition) + AvroOutputFormat.EXT;
- Class clazz = getNamedOutputAvroClass(job, namedOutput);
- if (clazz == null) {
- throw new RuntimeException("No named output with name " + namedOutput + " has been registered");
- }
- try {
- schema = (Schema) clazz.getField(SCHEMA_FIELD_NAME).get(null);
- if (schema == null) {
- //should never happen
- throw new RuntimeException("Schema must not be null in class " + clazz.getName());
- }
- } catch (Exception e) {
- //should never happen, but just in case...
- LOGGER.error("Implementation error, cannot extract a static field SCHEMA$ from class " + clazz.getName());
- throw new RuntimeException(e);
- }
- }
- private static Class<? extends GenericContainer> getNamedOutputAvroClass (JobConf job, String namesOutput) {
- return job.getClass(MO_AVRO_CLASS + namesOutput, null, GenericContainer.class);
- }
- public void collect(T datum, Reporter reporter) throws IOException {
- wrapper.datum(datum);
- if (null == writer) {
- writer = getRecordWriter(job);
- }
- getCollector(reporter).collect(wrapper, NullWritable.get());
- }
- public OutputCollector<AvroWrapper<T>, NullWritable> getCollector(final Reporter reporter) throws IOException {
- return new OutputCollector<AvroWrapper<T>, NullWritable>() {
- @Override
- public void collect(AvroWrapper<T> key, NullWritable value) throws IOException {
- writer.write(key, value);
- reporter.progress();
- }
- };
- }
- //code is taken and slightly adapted from AvroOutputFormat
- public RecordWriter<AvroWrapper<T>, NullWritable>
- getRecordWriter(JobConf job)
- throws IOException {
- final DataFileWriter<T> writer =
- new DataFileWriter<T>(new ReflectDatumWriter<T>());
- if (FileOutputFormat.getCompressOutput(job)) {
- int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
- String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
- CodecFactory factory = codecName.equals(DEFLATE_CODEC)
- ? CodecFactory.deflateCodec(level)
- : CodecFactory.fromString(codecName);
- writer.setCodec(factory);
- }
- writer.setSyncInterval(job.getInt(AvroOutputFormat.SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
- // copy metadata from job
- for (Map.Entry<String, String> e : job) {
- if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
- writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
- e.getValue());
- if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
- writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
- URLDecoder.decode(e.getValue(), "ISO-8859-1")
- .getBytes("ISO-8859-1"));
- }
- Path path = FileOutputFormat.getTaskOutputPath(job, namedOutputFilePath);
- writer.create(schema, path.getFileSystem(job).create(path));
- return new RecordWriter<AvroWrapper<T>, NullWritable>() {
- public void write(AvroWrapper<T> wrapper, NullWritable ignore)
- throws IOException {
- writer.append(wrapper.datum());
- }
- public void close(Reporter reporter) throws IOException {
- writer.close();
- }
- };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement