FancyKing

第2关:统计酒店评论中词频较高的词

Apr 3rd, 2020
190
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.08 KB | None | 0 0
  1. package com.processdata;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import java.util.Scanner;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.conf.Configured;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.client.Connection;
  9. import org.apache.hadoop.hbase.client.Put;
  10. import org.apache.hadoop.hbase.client.Result;
  11. import org.apache.hadoop.hbase.client.Scan;
  12. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  14. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  15. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  16. import org.apache.hadoop.hbase.util.Bytes;
  17. import org.apache.hadoop.io.IntWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.util.Tool;
  21. import org.apache.hadoop.util.ToolRunner;
  22. import org.apdplat.word.WordSegmenter;
  23. import org.apdplat.word.segmentation.Word;
  24. import com.util.HBaseUtil;
  25. import com.vdurmont.emoji.EmojiParser;
  26.  
  27. /**
  28.  * 词频统计
  29.  *
  30.  */
  31. public class WorldCountMapReduce extends Configured implements Tool {
  32.    
  33.  
  34.     public static class MyMapper extends TableMapper<Text, IntWritable> {
  35.         private static byte[] family = "comment_info".getBytes();
  36.         private static byte[] column = "content".getBytes();
  37.        
  38.         @Override
  39.         protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
  40.                 throws IOException, InterruptedException {
  41.             /********** Begin *********/
  42.          
  43.          
  44.             byte[] value = result.getValue(family, column);
  45.             String word = new String(value,"utf-8");
  46.             if(!word.isEmpty()){
  47.                 String filter = EmojiParser.removeAllEmojis(word);
  48.                 List<Word> segs = WordSegmenter.seg(filter);
  49.                 for(Word cont : segs) {
  50.                     Text text = new Text(cont.getText());
  51.                     IntWritable v = new IntWritable(1);
  52.                     context.write(text,v);
  53.                 }
  54.             }
  55.          
  56.          
  57.             /********** End *********/
  58.         }
  59.     }
  60.  
  61.     public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  62.         private static byte[] family =  "word_info".getBytes();
  63.         private static byte[] column = "count".getBytes();
  64.        
  65.         @Override
  66.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  67.             /********** Begin *********/
  68.          
  69.          
  70.             int sum = 0;
  71.             for (IntWritable value : values) {
  72.                 sum += value.get();
  73.             }
  74.             Put put = new Put(Bytes.toBytes(key.toString()));
  75.             put.addColumn(family,column,Bytes.toBytes(sum));
  76.             context.write(null,put);
  77.          
  78.          
  79.             /********** End *********/
  80.         }
  81.  
  82.     }
  83.  
  84.    
  85.    
  86.    
  87.    
  88.    
  89.     public int run(String[] args) throws Exception {
  90.         //配置Job
  91.         Configuration conf = HBaseConfiguration.create(getConf());
  92.         conf.set("hbase.zookeeper.quorum", "127.0.0.1");  //hbase 服务地址
  93.         conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
  94.         Scanner sc = new Scanner(System.in);
  95.         String arg1 = sc.next();
  96.         String arg2 = sc.next();
  97.         try {
  98.             HBaseUtil.createTable("comment_word_count", new String[] {"word_info"});
  99.         } catch (Exception e) {
  100.             // 创建表失败
  101.             e.printStackTrace();
  102.         }
  103.         Job job = configureJob(conf,new String[]{arg1,arg2});
  104.         return job.waitForCompletion(true) ? 0 : 1;
  105.     }
  106.  
  107.     private Job configureJob(Configuration conf, String[] args) throws IOException {
  108.         String tablename = args[0];
  109.         String targetTable = args[1];
  110.         Job job = new Job(conf,tablename);
  111.         Scan scan = new Scan();
  112.         scan.setCaching(300);
  113.         scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存
  114.         //初始化Mapper Reduce程序
  115.         TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, IntWritable.class,job);
  116.         TableMapReduceUtil.initTableReducerJob(targetTable,MyReducer.class,job);
  117.         job.setNumReduceTasks(1);
  118.         return job;
  119.     }
  120.  
  121. }
Add Comment
Please, Sign In to add comment