Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.HBaseAdmin
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.client.Get
- import java.io.IOException
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase._
- import org.apache.hadoop.hbase.client._
- import org.apache.hadoop.hbase.io._
- import org.apache.hadoop.hbase.mapreduce._
- import org.apache.hadoop.io._
- import org.apache.hadoop.mapreduce._
- import scala.collection.JavaConversions._
- case class HString(name: String) {
- lazy val bytes = name.getBytes
- override def toString = name
- }
- object HString {
- import scala.language.implicitConversions
- implicit def hstring2String(src: HString): String = src.name
- implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
- }
- object Families {
- val stream = HString("stream")
- val identity = HString("identity")
- }
- object Qualifiers {
- val title = HString("title")
- val url = HString("url")
- val media = HString("media")
- val media_source = HString("media_source")
- val content = HString("content")
- val nolimitid_timestamp = HString("nolimitid.timestamp")
- val original_id = HString("original_id")
- val timestamp = HString("timestamp")
- val date_created = HString("date_created")
- val count = HString("count")
- }
- object Tables {
- val rawstream100 = HString("raw_stream_1.0.0")
- val rawstream = HString("rawstream")
- }
- /*class tmapper extends TableMapper[Text, LongWritable]{
- type Context = Mapper[ImmutableBytesWritable, Result, Text, LongWritable]
- val word = new Text
- val one = new LongWritable(1)
- def map (key: ImmutableBytesWritable, value: Result, ctxt: Context) = {
- val cell = value.getColumnLatest(Families.stream , Qualifiers.content)
- val text = new String(cell.getValue()).toLowerCase
- val tokenizer = new StringTokenizer(text)
- while (tokenizer.hasMoreTokens) {
- val term = tokenizer.nextToken
- if (term.matches("[a-zA-Z0-9]+")) {
- word.set(term)
- //write(word, one)
- }
- }
- }
- }*/
- /*class treducer extends TableReducer[Text, LongWritable, ImmutableBytesWritable] {
- type Context = Reducer[Text, LongWritable, ImmutableBytesWritable, Mutation]
- def reduce(key: Text,values: java.lang.Iterable[LongWritable],context: Context) = {
- val count = values.foldLeft(0L) { (tally, i) => tally + i.get }
- val put = new Put(key.toString.getBytes, count) // be sure to comment on toString.getBytes
- put.add(Families.content, Qualifiers.count, Bytes.toBytes(count))
- context.write(null, put)
- }
- }*/
- class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
- def resultToPut(key: ImmutableBytesWritable, result: Result) : Put = {
- val put = new Put(key.get())
- for (kv <- result.raw()) {
- put.add(kv);
- }
- return put;
- }
- def map (row: ImmutableBytesWritable, value: Result, context: Context) {
- context.write(row, resultToPut(row, value))
- }
- }
- object Hello {
- val hbaseMaster = "127.0.0.1:60000"
- val hbaseZookeper = "127.0.0.1"
- def main(args: Array[String]): Unit = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.master", hbaseMaster)
- conf.set("hbase.zookeeper.quorum", hbaseZookeper)
- val hbaseAdmin = new HBaseAdmin(conf)
- //check table
- /*if(!hbaseAdmin.isTableAvailable("raw_stream_1.0.0")) {
- println("Table doesn't exist..quitting")
- }else{
- println("Table exists..")
- try{
- hbaseAdmin.snapshot("raw-snapshot", "raw_stream_1.0.0")
- hbaseAdmin.cloneSnapshot("raw-snapshot", "raw_stream")
- } catch {
- case ioe : IOException => println("Error IO "+ioe)
- }
- }*/
- val job = Job.getInstance(conf, "CopyTable")
- job.setJarByClass(classOf[Hello])
- val scan = new Scan()
- scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false) // don't set to true for MR jobs
- TableMapReduceUtil.initTableMapperJob(
- Tables.rawstream100.bytes, // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- classOf[tmapper], // mapper class
- null, // mapper output key class
- null, // mapper output value class
- job
- )
- TableMapReduceUtil.initTableReducerJob(
- Tables.rawstream, // Table name
- null, // Reducer class
- job
- )
- val b = job.waitForCompletion(true);
- if (!b) {
- throw new IOException("error with job!");
- }
- }
- }
- class Hello {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement