package com.mycompany.AvroMultipleOutputs;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLDecoder;
import java.text.NumberFormat;
import java.util.Map;
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
/**
* Author: zholudev
* Date: 8/2/11
*/
public class AvroMultipleOutputs<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroMultipleOutputs.class);
private static final String MO_AVRO_CLASS = "mo.namedOutput.avroClass.";
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
public static final String SCHEMA_FIELD_NAME = "SCHEMA$";
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private String namedOutputFilePath;
private Schema schema;
private final AvroWrapper<T> wrapper = new AvroWrapper<T>(null);
private RecordWriter<AvroWrapper<T>, NullWritable> writer = null;
private final JobConf job;
public static void addNamedOutput(JobConf job, String namedOutput, Class<? extends GenericContainer> specificAvroClass) {
if (namedOutput == null || namedOutput.trim().isEmpty()) {
throw new IllegalArgumentException("Named output cannot be null or empty");
}
job.setClass(MO_AVRO_CLASS + namedOutput, specificAvroClass, GenericContainer.class);
}
public AvroMultipleOutputs(JobConf job, String namedOutput) {
this.job = job;
//dots in the name are treated as file system separators
namedOutputFilePath = namedOutput.replace(".", "/");
if (!namedOutputFilePath.endsWith("/")) {
namedOutputFilePath += ".";
}
int partition = job.getInt("mapred.task.partition", 1);
namedOutputFilePath += "part-" + NUMBER_FORMAT.format(partition) + AvroOutputFormat.EXT;
Class clazz = getNamedOutputAvroClass(job, namedOutput);
if (clazz == null) {
throw new RuntimeException("No named output with name " + namedOutput + " has been registered");
}
try {
schema = (Schema) clazz.getField(SCHEMA_FIELD_NAME).get(null);
if (schema == null) {
//should never happen
throw new RuntimeException("Schema must not be null in class " + clazz.getName());
}
} catch (Exception e) {
//should never happen, but just in case...
LOGGER.error("Implementation error, cannot extract a static field SCHEMA$ from class " + clazz.getName());
throw new RuntimeException(e);
}
}
private static Class<? extends GenericContainer> getNamedOutputAvroClass (JobConf job, String namesOutput) {
return job.getClass(MO_AVRO_CLASS + namesOutput, null, GenericContainer.class);
}
public void collect(T datum, Reporter reporter) throws IOException {
wrapper.datum(datum);
if (null == writer) {
writer = getRecordWriter(job);
}
getCollector(reporter).collect(wrapper, NullWritable.get());
}
public OutputCollector<AvroWrapper<T>, NullWritable> getCollector(final Reporter reporter) throws IOException {
return new OutputCollector<AvroWrapper<T>, NullWritable>() {
@Override
public void collect(AvroWrapper<T> key, NullWritable value) throws IOException {
writer.write(key, value);
reporter.progress();
}
};
}
//code is taken and slightly adapted from AvroOutputFormat
public RecordWriter<AvroWrapper<T>, NullWritable>
getRecordWriter(JobConf job)
throws IOException {
final DataFileWriter<T> writer =
new DataFileWriter<T>(new ReflectDatumWriter<T>());
if (FileOutputFormat.getCompressOutput(job)) {
int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory = codecName.equals(DEFLATE_CODEC)
? CodecFactory.deflateCodec(level)
: CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
writer.setSyncInterval(job.getInt(AvroOutputFormat.SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
// copy metadata from job
for (Map.Entry<String, String> e : job) {
if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
e.getValue());
if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
URLDecoder.decode(e.getValue(), "ISO-8859-1")
.getBytes("ISO-8859-1"));
}
Path path = FileOutputFormat.getTaskOutputPath(job, namedOutputFilePath);
writer.create(schema, path.getFileSystem(job).create(path));
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
public void write(AvroWrapper<T> wrapper, NullWritable ignore)
throws IOException {
writer.append(wrapper.datum());
}
public void close(Reporter reporter) throws IOException {
writer.close();
}
};
}
}