FancyKing

第1关:统计每个城市的宾馆平均价格

Apr 3rd, 2020
185
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.41 KB | None | 0 0
  1. package com.processdata;
  2.  
  3. import java.io.IOException;
  4. import java.util.Scanner;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.conf.Configured;
  8. import org.apache.hadoop.hbase.HBaseConfiguration;
  9. import org.apache.hadoop.hbase.client.Connection;
  10. import org.apache.hadoop.hbase.client.Put;
  11. import org.apache.hadoop.hbase.client.Result;
  12. import org.apache.hadoop.hbase.client.Scan;
  13. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  14. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  15. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  16. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  17. import org.apache.hadoop.hbase.util.Bytes;
  18. import org.apache.hadoop.io.DoubleWritable;
  19. import org.apache.hadoop.io.Text;
  20. import org.apache.hadoop.mapreduce.Job;
  21. import org.apache.hadoop.util.Tool;
  22. import org.apache.hadoop.util.ToolRunner;
  23.  
  24. import com.util.HBaseUtil;
  25.  
  26. /**
  27.  * 使用MapReduce程序处理HBase中的数据并将最终结果存入到另一张表 1中
  28.  */
  29. public class HBaseMapReduce extends Configured implements Tool {
  30.  
  31.     public static class MyMapper extends TableMapper<Text, DoubleWritable> {
  32.         public static final byte[] column = "price".getBytes();
  33.         public static final byte[] family = "hotel_info".getBytes();
  34.  
  35.         @Override
  36.         protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
  37.                 throws IOException, InterruptedException {
  38.             /********** Begin *********/
  39.          
  40.          
  41.             String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes()));
  42.             byte[] value = result.getValue(family, column);
  43.            
  44.             //获取酒店价格
  45.             //String cityId1 = Bytes.toString(result.getValue("hotel_info".getBytes(), "price".getBytes()));
  46.            
  47.             //将价格转换为double
  48.             Double ho =Double.parseDouble(Bytes.toString(value));
  49.             //将价格转换成()类型
  50.             DoubleWritable i = new DoubleWritable(ho);
  51.             String key = cityId;
  52.             //写出(城市id,酒店价格)
  53.             context.write(new  Text(key),i);
  54.          
  55.          
  56.             /********** End *********/
  57.         }
  58.     }
  59.  
  60.     public static class MyTableReducer extends TableReducer<Text, DoubleWritable, ImmutableBytesWritable> {
  61.         @Override
  62.         public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
  63.             /********** Begin *********/
  64.          
  65.          
  66.             double sum=0;
  67.             int count=0;
  68.             for(DoubleWritable value:values){
  69.                 count++;
  70.                 sum+=value.get();
  71.             }
  72.             double avePrice=sum/count;
  73.             //创建pit对象
  74.             Put put =new Put(Bytes.toBytes(key.toString()));
  75.             put.addColumn("average_infos".getBytes(),"price".getBytes(),Bytes.toBytes(String.valueOf(avePrice)));
  76.            
  77.             context.write(null,put);
  78.          
  79.          
  80.             /********** End *********/
  81.         }
  82.  
  83.     }
  84.  
  85.    
  86.    
  87.    
  88.    
  89.    
  90.     public int run(String[] args) throws Exception {
  91.         //配置Job
  92.         Configuration conf = HBaseConfiguration.create(getConf());
  93.         conf.set("hbase.zookeeper.quorum", "127.0.0.1");  //hbase 服务地址
  94.         conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
  95.         Scanner sc = new Scanner(System.in);
  96.         String arg1 = sc.next();
  97.         String arg2 = sc.next();
  98.         //String arg1 = "t_city_hotels_info";
  99.         //String arg2 = "average_table";
  100.         try {
  101.             HBaseUtil.createTable("average_table", new String[] {"average_infos"});
  102.         } catch (Exception e) {
  103.             // 创建表失败
  104.             e.printStackTrace();
  105.         }
  106.         Job job = configureJob(conf,new String[]{arg1,arg2});
  107.         return job.waitForCompletion(true) ? 0 : 1;
  108.     }
  109.  
  110.     private Job configureJob(Configuration conf, String[] args) throws IOException {
  111.         String tablename = args[0];
  112.         String targetTable = args[1];
  113.         Job job = new Job(conf,tablename);
  114.         Scan scan = new Scan();
  115.         scan.setCaching(300);
  116.         scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存
  117.         //初始化Mapreduce程序
  118.         TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, DoubleWritable.class,job);
  119.         //初始化Reduce
  120.         TableMapReduceUtil.initTableReducerJob(
  121.                 targetTable,        // output table
  122.                 MyTableReducer.class,    // reducer class
  123.                 job);
  124.         job.setNumReduceTasks(1);
  125.         return job;
  126.     }
  127. }
Add Comment
Please, Sign In to add comment