Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.23 KB | None | 0 0
  1. package org.myorg;
  2.  
  3. import java.io.IOException;
  4. import java.util.*;
  5.  
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.conf.*;
  8. import org.apache.hadoop.io.*;
  9. import org.apache.hadoop.mapred.*;
  10. import org.apache.hadoop.util.*;
  11.  
  12. public class FilterData {
  13.  
  14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
  15. private Text ipadr = new Text();
  16. private Text monthFile = new Text();
  17.  
  18. public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  19. String line = value.toString();
  20. String[] temp = line.split("\"", 0);
  21. if (temp.length == 3){
  22. String[] temp2 = temp[1].split(" ", 0);
  23. String[] temp3 = temp[0].split(" ", 0);
  24. String[] temp4 = temp3[3].split("/", 0);
  25. monthFile.set(temp4[1] + temp2[1]);
  26. ipadr.set(temp3[0]);
  27. output.collect(monthFile, ipadr);
  28. }
  29. }
  30. }
  31.  
  32. public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
  33. Text result = new Text();
  34.  
  35. public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  36. String temp = "[";
  37. TreeSet <String> ts = new TreeSet<String>();
  38. while (values.hasNext()) {
  39. s.add(values.next().toString());
  40. if (temp.equals("["))
  41. temp = temp + values.next().toString();
  42. else
  43. temp = temp + " " + values.next().toString();
  44. }
  45. for (String s : ts){
  46. if (temp.equals("["))
  47. temp = temp + s;
  48. else
  49. temp = temp + " " + s;
  50. }
  51. temp = temp + "]";
  52. result.set(temp);
  53. output.collect(key, result);
  54. }
  55. }
  56.  
  57. public static void main(String[] args) throws Exception {
  58. JobConf conf = new JobConf(FilterData.class);
  59. conf.setJobName("filterdata");
  60.  
  61. conf.setOutputKeyClass(Text.class);
  62. conf.setOutputValueClass(Text.class);
  63.  
  64. conf.setMapperClass(Map.class);
  65. conf.setCombinerClass(Reduce.class);
  66. conf.setReducerClass(Reduce.class);
  67.  
  68. conf.setInputFormat(TextInputFormat.class);
  69. conf.setOutputFormat(TextOutputFormat.class);
  70.  
  71. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  72. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  73.  
  74. JobClient.runJob(conf);
  75. }
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement