Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.HBaseAdmin
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.client.Get
- import java.io.IOException
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase._
- import org.apache.hadoop.hbase.client._
- import org.apache.hadoop.hbase.io._
- import org.apache.hadoop.hbase.mapreduce._
- import org.apache.hadoop.io._
- import org.apache.hadoop.mapreduce._
- import scala.collection.JavaConversions._
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
- case class HString(name: String) {
- lazy val bytes = name.getBytes
- override def toString = name
- }
- object HString {
- import scala.language.implicitConversions
- implicit def hstring2String(src: HString): String = src.name
- implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
- }
- object Families {
- val stream = HString("stream")
- val identity = HString("identity")
- val cf = HString("cf")
- }
- object Qualifiers {
- val title = HString("title")
- val url = HString("url")
- val media = HString("media")
- val media_source = HString("media_source")
- val content = HString("content")
- val nolimitid_timestamp = HString("nolimitid.timestamp")
- val original_id = HString("original_id")
- val timestamp = HString("timestamp")
- val date_created = HString("date_created")
- val count = HString("count")
- }
- object Tables {
- val rawstream100 = HString("raw_stream_1.0.0")
- val rawstream = HString("rawCopy")
- }
- class tmapper extends TableMapper[Text, IntWritable]{
- type Contxt = Mapper[ImmutableBytesWritable, Result, Text, IntWritable]#Context
- override def map (row: ImmutableBytesWritable, value: Result, context: Contxt) {
- val key = Bytes.toString(row.get())
- /*val put = new Put(row.get())
- put.add(Families.cf.bytes , Qualifiers.count.bytes, Bytes.toBytes(1))
- context.write(row, put)*/
- context.write(new Text(key), new IntWritable(1))
- }
- }
- class treducer extends TableReducer[ImmutableBytesWritable, IntWritable, ImmutableBytesWritable]{
- type Contxt = Reducer[ImmutableBytesWritable, IntWritable, ImmutableBytesWritable, Mutation]#Context
- def reduce(key: Text, values: Iterable[IntWritable], context:Contxt){
- /*var i=0
- for (v <- values) {
- i += v.get()
- }*/
- val put = new Put(Bytes.toBytes(key.toString())) // be sure to comment on toString.getBytes
- put.add(Families.cf.bytes , Qualifiers.count.bytes, Bytes.toBytes(values.size))
- context.write(null, put)
- }
- }
- object Hello {
- val hbaseMaster = "127.0.0.1:60000"
- val hbaseZookeper = "127.0.0.1"
- def main(args: Array[String]): Unit = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.master", hbaseMaster)
- conf.set("hbase.zookeeper.quorum", hbaseZookeper)
- val hbaseAdmin = new HBaseAdmin(conf)
- val scan = new Scan()
- scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false) // don't set to true for MR jobs
- val job = new Job(conf)
- job.setJobName("CopyTable");
- job.setJarByClass(classOf[Hello])
- TableMapReduceUtil.initTableMapperJob(
- "raw_stream_1.0.0", // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- classOf[tmapper], // mapper class
- classOf[Text], // mapper output key class
- classOf[IntWritable], // mapper output value class
- job
- )
- job.setMapperClass(classOf[tmapper])
- TableMapReduceUtil.initTableReducerJob(
- "test", // Table name
- classOf[treducer], // Reducer class
- job
- )
- //job.setCombinerClass(classOf[treducer])
- job.setReducerClass(classOf[treducer])
- job.setNumReduceTasks(1)
- //
- job.setOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setOutputValueClass(classOf[Put])
- //
- job.setOutputFormatClass(classOf[TableOutputFormat[Unit]])
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "test")
- //
- val b = job.waitForCompletion(true);
- if (!b) {
- //
- }
- println("Mapprogress="+job.mapProgress())
- }
- }
- class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement