Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.HBaseAdmin
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.client.Get
- import java.io.IOException
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase._
- import org.apache.hadoop.hbase.client._
- import org.apache.hadoop.hbase.io._
- import org.apache.hadoop.hbase.mapreduce._
- import org.apache.hadoop.io._
- import org.apache.hadoop.mapreduce._
- import scala.collection.JavaConversions._
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
- case class HString(name: String) {
- lazy val bytes = name.getBytes
- override def toString = name
- }
- object HString {
- import scala.language.implicitConversions
- implicit def hstring2String(src: HString): String = src.name
- implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
- }
- object Families {
- val stream = HString("stream")
- val identity = HString("identity")
- }
- object Qualifiers {
- val title = HString("title")
- val url = HString("url")
- val media = HString("media")
- val media_source = HString("media_source")
- val content = HString("content")
- val nolimitid_timestamp = HString("nolimitid.timestamp")
- val original_id = HString("original_id")
- val timestamp = HString("timestamp")
- val date_created = HString("date_created")
- val count = HString("count")
- }
- object Tables {
- val rawstream100 = HString("raw_stream_1.0.0")
- val rawstream = HString("rawCopy")
- }
- class tmapper extends TableMapper[Text, Put]{
- def map (row: ImmutableBytesWritable, value: Result, context: Context) {
- val put = new Put(row.get())
- val key = Bytes.toString(row.get());
- for (kv <- value.raw()) {
- put.add(kv)
- }
- context.write(new Text(key), put)
- }
- }
- class treducer extends TableReducer[Text, Put, Put]{
- def reduce (row: Text, values: Put, context:Context){
- context.write(values, values);
- }
- }
- object Hello {
- val hbaseMaster = "127.0.0.1:60000"
- val hbaseZookeper = "127.0.0.1"
- def main(args: Array[String]): Unit = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.master", hbaseMaster)
- conf.set("hbase.zookeeper.quorum", hbaseZookeper)
- val hbaseAdmin = new HBaseAdmin(conf)
- val scan = new Scan()
- scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false) // don't set to true for MR jobs
- val job = new Job(conf)
- job.setJobName("CopyTable");
- job.setJarByClass(classOf[Hello])
- TableMapReduceUtil.initTableMapperJob(
- Tables.rawstream100.name, // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- classOf[tmapper], // mapper class
- classOf[ImmutableBytesWritable], // mapper output key class
- classOf[Result], // mapper output value class
- job
- )
- job.setMapperClass(classOf[tmapper])
- job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setMapOutputValueClass(classOf[Result])
- TableMapReduceUtil.initTableReducerJob(
- Tables.rawstream.name, // Table name
- classOf[treducer], // Reducer class
- job
- )
- job.setReducerClass(classOf[treducer]);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(classOf[TableOutputFormat[Unit]]);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, Tables.rawstream);
- val b = job.waitForCompletion(true);
- if (!b) {
- throw new IOException("error within job!");
- }
- }
- }
- class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement