Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2019
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.27 KB | None | 0 0
  1. package org.myorg;
  2.  
  3. import java.io.IOException;
  4. import java.util.*;
  5.  
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.conf.*;
  8. import org.apache.hadoop.io.*;
  9. import org.apache.hadoop.mapred.*;
  10. import org.apache.hadoop.util.*;
  11.  
  12. public class FilterData {
  13.  
  14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
  15. private Text ipadr = new Text();
  16. private Text monthFile = new Text();
  17.  
  18. public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  19. String line = value.toString();
  20. if (line != null){
  21. String[] temp = line.split("\"", 0);
  22. if (temp.length == 3){
  23. String[] temp2 = temp[1].split(" ", 0);
  24. String[] temp3 = temp[0].split(" ", 0);
  25. String[] temp4 = temp3[3].split("/", 0);
  26. monthFile.set(temp4[1] + temp2[1]);
  27. ipadr.set(temp3[0]);
  28. output.collect(monthFile, ipadr);
  29. }
  30. }
  31. }
  32. }
  33.  
  34. public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
  35. Text result = new Text();
  36.  
  37. public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  38. String temp = "[";
  39. TreeSet <String> ts = new TreeSet<String>();
  40. while (values.hasNext()) {
  41. s.add(values.next().toString());
  42. if (temp.equals("["))
  43. temp = temp + values.next().toString();
  44. else
  45. temp = temp + " " + values.next().toString();
  46. }
  47. for (String s : ts){
  48. if (temp.equals("["))
  49. temp = temp + s;
  50. else
  51. temp = temp + " " + s;
  52. }
  53. temp = temp + "]";
  54. result.set(temp);
  55. output.collect(key, result);
  56. }
  57. }
  58.  
  59. public static void main(String[] args) throws Exception {
  60. JobConf conf = new JobConf(FilterData.class);
  61. conf.setJobName("filterdata");
  62.  
  63. conf.setOutputKeyClass(Text.class);
  64. conf.setOutputValueClass(Text.class);
  65.  
  66. conf.setMapperClass(Map.class);
  67. conf.setCombinerClass(Reduce.class);
  68. conf.setReducerClass(Reduce.class);
  69.  
  70. conf.setInputFormat(TextInputFormat.class);
  71. conf.setOutputFormat(TextOutputFormat.class);
  72.  
  73. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  74. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  75.  
  76. JobClient.runJob(conf);
  77. }
  78. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement