Advertisement
Guest User

Untitled

a guest
Mar 26th, 2019
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.42 KB | None | 0 0
  1. package pl.edu.wat;
  2.  
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15.  
  16. import java.io.IOException;
  17. import java.util.function.Function;
  18. import java.util.stream.StreamSupport;
  19.  
  20. import static java.lang.System.err;
  21. import static java.lang.System.exit;
  22.  
  23.  
  24. // START PROGRAMU
  25.  
  26.  
  27. public final class AirlineApp extends Configured {
  28. //
  29. static final class IntArrayWritable extends ArrayWritable{
  30. IntArrayWritable() {
  31. super(IntWritable.class);
  32. }
  33.  
  34. IntWritable get(int index) {
  35. return (IntWritable) this.get()[index];
  36. }
  37.  
  38. int getInt(int index) {
  39. return this.get(index).get();
  40. }
  41. }
  42. //przyjmuje 4 parametry klucz na wejsciu, wartosc na wejsciu, klucz na wyjsciu wartosc na wyjsciu
  43. //to jest object klucz WE WARTOSC WE KLUCZ WY WARTOSC WY
  44. public static final class AirlineMapper extends Mapper<Object, Text, Text, IntArrayWritable> {
  45. private static final String Delimiter = ",";
  46. private static final int CharP = Character.getNumericValue('P');
  47. //pierwsza linia ktora bedzie na maperze to bedzie naglowek wiec na tym etapie pomijamy caly wiersz
  48. private final Text TailNum = new Text();
  49. private final IntWritable AirTime = new IntWritable(); //bierzemy Czas lotu
  50. private final IntWritable Distance = new IntWritable(); //bierzemy dystans
  51.  
  52. //MOJE
  53. private final IntWritable Age = new IntWritable(); //biore wiek
  54. private final
  55.  
  56. private final Writable[] Values = new Writable[]{AirTime, Distance};
  57. private final IntArrayWritable Array = new IntArrayWritable(); //to wszystko wpada do tablicy
  58.  
  59. @Override //OBJECT KEY - NR LINII PLIKU MUSI BYC, TEXT VALUE TO CALY WIERSZ PLIKU, TYPY GENERYCZNY (kontekst ktory musi byc), throws exeption bo musi byc
  60. protected void map(Object key, Text value, Mapper<Object, Text, Text, IntArrayWritable>.Context context) throws IOException, InterruptedException {
  61. if (CharP == value.charAt(0)) {
  62. return;
  63. }
  64.  
  65. String[] values = value.toString().split(Delimiter);
  66. this.TailNum.set(values[10]);
  67.  
  68. try {
  69. this.AirTime.set(Integer.parseInt(values[13]));
  70. this.Distance.set(Integer.parseInt(values[18]));
  71. } catch (NumberFormatException var6) {
  72. return;
  73. }
  74.  
  75. this.Array.set(this.Values);
  76. context.write(this.TailNum, this.Array);
  77. }
  78. }
  79.  
  80. public static class SpeedReducer extends Reducer<Text, IntArrayWritable, Text, FloatWritable> {
  81. private final FloatWritable result = new FloatWritable();
  82.  
  83. @Override
  84. protected void reduce(Text key, Iterable<IntArrayWritable> values, Context context) throws IOException, InterruptedException {
  85. // [0]: totalAirTime, [1]: totalDistance
  86. int[] total = Sum(values, v -> v::getInt);
  87. // miles / minutes -> km / h
  88. result.set(96.5604f * total[1] / total[0]);
  89. context.write(key, result);
  90. }
  91. }
  92.  
  93. public static void main(String[] args) throws Exception {
  94. Configuration conf = new Configuration();
  95. String[] jobArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  96. if (jobArgs.length != 2) {
  97. err.println("Usage: pl.edu.wat.AirlineApp <in> <out>");
  98. System.exit(2);
  99. }
  100.  
  101. Job job = Job.getInstance(conf, "pl.edu.wat.AirlineApp");
  102.  
  103. job.setJarByClass(AirlineApp.class);
  104. job.setMapperClass(AirlineMapper.class);
  105. job.setReducerClass(SpeedReducer.class);
  106. job.setOutputKeyClass(Text.class);
  107. job.setOutputValueClass(FloatWritable.class);
  108. // Same argument types as reducer?
  109. job.setMapOutputKeyClass(Text.class);
  110. job.setMapOutputValueClass(IntArrayWritable.class);
  111.  
  112. FileInputFormat.addInputPath(job, new Path(jobArgs[0]));
  113. FileOutputFormat.setOutputPath(job, new Path(jobArgs[1])); //wyladuje tyle plikow ile bylo reducerow stworzy plik SUCCESS lub FAILED
  114. job.setInputFormatClass(TextInputFormat.class); //plik tekstowy typu wejsciowego
  115. job.setOutputFormatClass(TextOutputFormat.class); //plik tekstowy typu wyjsciowego
  116.  
  117. boolean result = job.waitForCompletion(true);
  118. exit(result ? 0 : 1); //czeka na joba, zbiera logi, czeka i konczy dzialanie
  119. }
  120.  
  121. // mapper - function currying
  122. private static <T> int[] Sum(Iterable<T> values, Function<? super T, Function<Integer, Integer>> mapper) {
  123. int[] result = new int[]{0, 0};
  124. StreamSupport.stream(values.spliterator(), false).forEach(v -> {
  125. Function<Integer, Integer> capture = mapper.apply(v);
  126. result[0] += capture.apply(0);
  127. result[1] += capture.apply(1);
  128. });
  129.  
  130. return result;
  131. }
  132. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement