Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package pl.edu.wat;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import java.io.IOException;
- import java.util.function.Function;
- import java.util.stream.StreamSupport;
- import static java.lang.System.err;
- import static java.lang.System.exit;
- // START PROGRAMU
- public final class AirlineApp extends Configured {
- //
- static final class IntArrayWritable extends ArrayWritable{
- IntArrayWritable() {
- super(IntWritable.class);
- }
- IntWritable get(int index) {
- return (IntWritable) this.get()[index];
- }
- int getInt(int index) {
- return this.get(index).get();
- }
- }
- //przyjmuje 4 parametry klucz na wejsciu, wartosc na wejsciu, klucz na wyjsciu wartosc na wyjsciu
- //to jest object klucz WE WARTOSC WE KLUCZ WY WARTOSC WY
- public static final class AirlineMapper extends Mapper<Object, Text, Text, IntArrayWritable> {
- private static final String Delimiter = ",";
- private static final int CharP = Character.getNumericValue('P');
- //pierwsza linia ktora bedzie na maperze to bedzie naglowek wiec na tym etapie pomijamy caly wiersz
- private final Text TailNum = new Text();
- private final IntWritable AirTime = new IntWritable(); //bierzemy Czas lotu
- private final IntWritable Distance = new IntWritable(); //bierzemy dystans
- //MOJE
- private final IntWritable Age = new IntWritable(); //biore wiek
- private final
- private final Writable[] Values = new Writable[]{AirTime, Distance};
- private final IntArrayWritable Array = new IntArrayWritable(); //to wszystko wpada do tablicy
- @Override //OBJECT KEY - NR LINII PLIKU MUSI BYC, TEXT VALUE TO CALY WIERSZ PLIKU, TYPY GENERYCZNY (kontekst ktory musi byc), throws exeption bo musi byc
- protected void map(Object key, Text value, Mapper<Object, Text, Text, IntArrayWritable>.Context context) throws IOException, InterruptedException {
- if (CharP == value.charAt(0)) {
- return;
- }
- String[] values = value.toString().split(Delimiter);
- this.TailNum.set(values[10]);
- try {
- this.AirTime.set(Integer.parseInt(values[13]));
- this.Distance.set(Integer.parseInt(values[18]));
- } catch (NumberFormatException var6) {
- return;
- }
- this.Array.set(this.Values);
- context.write(this.TailNum, this.Array);
- }
- }
- public static class SpeedReducer extends Reducer<Text, IntArrayWritable, Text, FloatWritable> {
- private final FloatWritable result = new FloatWritable();
- @Override
- protected void reduce(Text key, Iterable<IntArrayWritable> values, Context context) throws IOException, InterruptedException {
- // [0]: totalAirTime, [1]: totalDistance
- int[] total = Sum(values, v -> v::getInt);
- // miles / minutes -> km / h
- result.set(96.5604f * total[1] / total[0]);
- context.write(key, result);
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] jobArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (jobArgs.length != 2) {
- err.println("Usage: pl.edu.wat.AirlineApp <in> <out>");
- System.exit(2);
- }
- Job job = Job.getInstance(conf, "pl.edu.wat.AirlineApp");
- job.setJarByClass(AirlineApp.class);
- job.setMapperClass(AirlineMapper.class);
- job.setReducerClass(SpeedReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FloatWritable.class);
- // Same argument types as reducer?
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntArrayWritable.class);
- FileInputFormat.addInputPath(job, new Path(jobArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(jobArgs[1])); //wyladuje tyle plikow ile bylo reducerow stworzy plik SUCCESS lub FAILED
- job.setInputFormatClass(TextInputFormat.class); //plik tekstowy typu wejsciowego
- job.setOutputFormatClass(TextOutputFormat.class); //plik tekstowy typu wyjsciowego
- boolean result = job.waitForCompletion(true);
- exit(result ? 0 : 1); //czeka na joba, zbiera logi, czeka i konczy dzialanie
- }
- // mapper - function currying
- private static <T> int[] Sum(Iterable<T> values, Function<? super T, Function<Integer, Integer>> mapper) {
- int[] result = new int[]{0, 0};
- StreamSupport.stream(values.spliterator(), false).forEach(v -> {
- Function<Integer, Integer> capture = mapper.apply(v);
- result[0] += capture.apply(0);
- result[1] += capture.apply(1);
- });
- return result;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement