Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.ArrayWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class WeatherMapReduce {
- public static class WeatherMapper extends Mapper<Object, Text, Text, IntWritable> {
- private final static Text key = new Text();
- private final static IntWritable value = new IntWritable();
- public void map(Object k, Text file, Context context) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(file.toString());
- while (itr.hasMoreTokens()) {
- String line = itr.nextToken();
- if (!line.startsWith("UK000056225") && !line.startsWith("UK000003377")) {
- continue;
- }
- String[] data = line.split(",");
- String loc = data[0];
- String date = data[1];
- String type = data[2];
- int val = Integer.parseInt(data[3]);
- if (!type.equals("TMIN") && !type.equals("TMAX")) {
- continue;
- }
- key.set(loc + "," + date);
- value.set(val);
- context.write(key, value);
- }
- }
- }
- public static class WeatherReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- boolean firstValueSet = false;
- int firstValue = 0;
- boolean secondValueSet = false;
- int secondValue = 0;
- for (IntWritable val : values) {
- if (firstValueSet) {
- secondValue = val.get();
- secondValueSet = true;
- } else {
- firstValue = val.get();
- firstValueSet = true;
- }
- }
- // only output this date if there is both a TMIN and a TMAX
- if (firstValueSet && secondValueSet) {
- int range = Math.abs(firstValue - secondValue);
- System.out.println("'" + firstValue + "' and '" + secondValue + "' has range " + range);
- result.set(range);
- context.write(key, result);
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, "WeatherMapReduce");
- job.setJarByClass(WeatherMapReduce.class);
- job.setMapperClass(WeatherMapper.class);
- job.setCombinerClass(WeatherReducer.class);
- job.setReducerClass(WeatherReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement