Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package csw.hadoop.reuse;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.io.PrintStream;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * Example job which takes a dataset of [identifier,timestamp,speed] and
- * calculates (naively) the total distance travelled by each identifier
- * <p>
- * Shows how a Key Group Comparator can be used, along with sublte key reuse in
- * the reducer to achieve the output
- */
- public class KeyReducerReuseExample extends Configured implements Tool {
- @Override
- public int run(String[] arg0) throws Exception {
- Job job = new Job(getConf());
- job.setJarByClass(KeyReducerReuseExample.class);
- Configuration conf = job.getConfiguration();
- job.setInputFormatClass(TextInputFormat.class);
- TextInputFormat.setInputPaths(job, createTestFile(conf));
- job.setMapperClass(KrreMapper.class);
- job.setReducerClass(KrreReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- Path outputPath = new Path(conf.get("outputPath"));
- FileSystem.get(conf).delete(outputPath, true);
- SequenceFileOutputFormat.setOutputPath(job, outputPath);
- job.setMapOutputKeyClass(CompositeKey.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setGroupingComparatorClass(CompositeKeyGroupComparator.class);
- job.setNumReduceTasks(1);
- if (job.waitForCompletion(true)) {
- dumpSeqFile(new Path(outputPath, "part-r-00000"));
- return 0;
- } else {
- return 1;
- }
- }
- private void dumpSeqFile(Path path) throws IOException {
- Configuration conf = getConf();
- SequenceFile.Reader reader = new SequenceFile.Reader(
- FileSystem.get(conf), path, conf);
- Text key = new Text();
- IntWritable val = new IntWritable();
- while (reader.next(key, val)) {
- System.err.printf("%s\t%s\n", key, val);
- }
- reader.close();
- }
- /**
- * Create a test file
- *
- * @param conf
- * @return
- * @throws IOException
- */
- private String createTestFile(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- String file = "krre-test.csv";
- PrintStream ps = new PrintStream(fs.create(new Path(file), true));
- long now = System.currentTimeMillis();
- for (long ts = now - 60000; ts <= now; ts += 1000) {
- ps.printf("%s,%d,%d\n", "A", ts, 1);
- ps.printf("%s,%d,%d\n", "B", ts, 2);
- ps.printf("%s,%d,%d\n", "C", ts, 3);
- }
- ps.close();
- return file;
- }
- /**
- * Container for a string identifier and a timestamp
- */
- public static class CompositeKey implements
- WritableComparable<CompositeKey> {
- public Text identifier = new Text();
- public long timestamp;
- @Override
- public void readFields(DataInput in) throws IOException {
- identifier.readFields(in);
- timestamp = in.readLong();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- identifier.write(out);
- out.writeLong(timestamp);
- }
- @Override
- public int compareTo(CompositeKey o) {
- int c = identifier.compareTo(o.identifier);
- if (c != 0) {
- return c;
- }
- if (timestamp < o.timestamp) {
- return -1;
- } else {
- return timestamp == o.timestamp ? 0 : 1;
- }
- }
- @Override
- public int hashCode() {
- // we want to partition on the identifier field only, so all records
- // values for the same identifier end up in 1 reducer
- return identifier.hashCode();
- }
- @Override
- public String toString() {
- return String.format("%s,%d", identifier, timestamp);
- }
- }
- /**
- * Key grouper which only considers the identifier component of the key
- */
- public static class CompositeKeyGroupComparator extends WritableComparator {
- protected CompositeKeyGroupComparator() {
- super(CompositeKey.class, true);
- }
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- return ((CompositeKey) a).identifier
- .compareTo(((CompositeKey) b).identifier);
- }
- }
- public static class KrreMapper extends
- Mapper<LongWritable, Text, CompositeKey, IntWritable> {
- protected CompositeKey outputKey = new CompositeKey();
- protected IntWritable outputValue = new IntWritable();
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String tokens[] = value.toString().split(",");
- outputKey.identifier.set(tokens[0]);
- outputKey.timestamp = Long.parseLong(tokens[1]);
- outputValue.set(Integer.parseInt(tokens[2]));
- context.write(outputKey, outputValue);
- }
- }
- /**
- * Naive reducer which calculates the distance travelled for a given
- * identifer from collection of timestamp and speed measurements
- * <p>
- * Obviously this implementation is flawed in it's distance calculation
- */
- public static class KrreReducer extends
- Reducer<CompositeKey, IntWritable, Text, IntWritable> {
- @Override
- protected void reduce(CompositeKey key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- long prevTimestamp = 0;
- int dist = 0;
- for (IntWritable value : values) {
- if (prevTimestamp != 0) {
- // subtly the component values of the key change with each
- // iteration so we can track the delta in timestamps
- long timeDelta = key.timestamp - prevTimestamp;
- dist += timeDelta * value.get();
- }
- prevTimestamp = key.timestamp;
- }
- context.write(key.identifier, new IntWritable(dist));
- }
- }
- public static void main(String args[]) throws Exception {
- System.exit(ToolRunner.run(new KeyReducerReuseExample(), args));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement