Advertisement
Guest User

cccw

a guest
Nov 22nd, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.51 KB | None | 0 0
  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3.  
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.ArrayWritable;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.io.Writable;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15.  
  16. public class WeatherMapReduce {
  17.     public static class WeatherMapper extends Mapper<Object, Text, Text, IntWritable> {
  18.  
  19.         private final static Text key = new Text();
  20.         private final static IntWritable value = new IntWritable();
  21.  
  22.         public void map(Object k, Text file, Context context) throws IOException, InterruptedException {
  23.             StringTokenizer itr = new StringTokenizer(file.toString());
  24.             while (itr.hasMoreTokens()) {
  25.                 String line = itr.nextToken();
  26.  
  27.                 if (!line.startsWith("UK000056225") && !line.startsWith("UK000003377")) {
  28.                     continue;
  29.                 }
  30.  
  31.                 String[] data = line.split(",");
  32.  
  33.                 String loc = data[0];
  34.                 String date = data[1];
  35.                 String type = data[2];
  36.                 int val = Integer.parseInt(data[3]);
  37.  
  38.                 if (!type.equals("TMIN") && !type.equals("TMAX")) {
  39.                     continue;
  40.                 }
  41.  
  42.                 key.set(loc  + "," + date);
  43.                 value.set(val);
  44.  
  45.                 context.write(key, value);
  46.             }
  47.         }
  48.     }
  49.  
  50.     public static class WeatherReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  51.         private IntWritable result = new IntWritable();
  52.  
  53.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  54.             boolean firstValueSet = false;
  55.             int firstValue = 0;
  56.             boolean secondValueSet = false;
  57.             int secondValue = 0;
  58.  
  59.             for (IntWritable val : values) {
  60.                 if (firstValueSet) {
  61.                     secondValue = val.get();
  62.                     secondValueSet = true;
  63.                 } else {
  64.                     firstValue = val.get();
  65.                     firstValueSet = true;
  66.                 }
  67.             }
  68.  
  69.             // only output this date if there is both a TMIN and a TMAX
  70.             if (firstValueSet && secondValueSet) {
  71.                 int range = Math.abs(firstValue - secondValue);
  72.                 System.out.println("'" + firstValue + "' and '" + secondValue + "' has range " + range);
  73.                 result.set(range);
  74.                 context.write(key, result);
  75.             }
  76.         }
  77.     }
  78.  
  79.     public static void main(String[] args) throws Exception {
  80.         Configuration conf = new Configuration();
  81.         Job job = Job.getInstance(conf, "WeatherMapReduce");
  82.         job.setJarByClass(WeatherMapReduce.class);
  83.         job.setMapperClass(WeatherMapper.class);
  84.         job.setCombinerClass(WeatherReducer.class);
  85.         job.setReducerClass(WeatherReducer.class);
  86.         job.setOutputKeyClass(Text.class);
  87.         job.setOutputValueClass(IntWritable.class);
  88.         FileInputFormat.addInputPath(job, new Path(args[0]));
  89.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  90.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  91.     }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement