Advertisement
FancyKing

第2关:文件内容合并去重

Mar 19th, 2020
414
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.55 KB | None | 0 0
  1. import java.io.IOException;
  2.  
  3. import java.util.*;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.util.GenericOptionsParser;
  13.  
  14. /**
  15.  * @author FancyKing
  16.  */
  17. public class Merge {
  18.  
  19.     /**
  20.      * @param args
  21.      * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
  22.      */
  23.     //在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
  24.  
  25.     /********** Begin **********/
  26.     public static class Map extends Mapper<Object, Text, Text, Text> {  
  27.         private static Text text = new Text();
  28.         public void map(Object key, Text value, Context content) throws IOException, InterruptedException {  
  29.  
  30.             text = value;  
  31.             content.write(text, new Text(""));  
  32.         }  
  33.     }
  34.    
  35.    
  36.     /********** End **********/
  37.  
  38.  
  39.    
  40.    
  41.     //在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException
  42.     /********** Begin **********/
  43.    
  44.     public static class Reduce extends Reducer<Text, Text, Text, Text> {  
  45.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  46.             context.write(key, new Text(""));  
  47.         }  
  48.     }
  49.    
  50.     /********** End **********/
  51.  
  52.  
  53.  
  54.  
  55.  
  56.    
  57.     public static void main(String[] args) throws Exception{
  58.  
  59.         // TODO Auto-generated method stub
  60.         Configuration conf = new Configuration();
  61.         conf.set("fs.default.name","hdfs://localhost:9000");
  62.        
  63.         Job job = Job.getInstance(conf,"Merge and duplicate removal");
  64.         job.setJarByClass(Merge.class);
  65.         job.setMapperClass(Map.class);
  66.         job.setCombinerClass(Reduce.class);
  67.         job.setReducerClass(Reduce.class);
  68.         job.setOutputKeyClass(Text.class);
  69.         job.setOutputValueClass(Text.class);
  70.         String inputPath = "/user/tmp/input/";  //在这里设置输入路径
  71.         String outputPath = "/user/tmp/output/";  //在这里设置输出路径
  72.  
  73.         FileInputFormat.addInputPath(job, new Path(inputPath));
  74.         FileOutputFormat.setOutputPath(job, new Path(outputPath));
  75.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  76.     }
  77.  
  78. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement