Advertisement
pavan0591

Untitled

Aug 25th, 2013
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 16.99 KB | None | 0 0
  1. public class SampleJob {
  2.    
  3.         public static String ViewersTable=null;
  4.         public static String ViewersTable_ColumnFamily=null;
  5.         public static String ViewersTable_ColumnQualifier=null;
  6.         public static String ContentidxTable=null;
  7.         public static String ContentidxTable_ColumnFamily=null;
  8.         public static String ContentidxTable_ColumnQualifier=null;
  9.         public static String ContentTable=null;
  10.         public static String ContentTable_ColumnFamily=null;
  11.         public static String ContentTable_ColumnQualifier=null;
  12.         public static String OutputTable=null;
  13.         public static String OutputTable_ColumnQualifier=null;
  14.         public static String OutputTable_ColumnFamily=null;
  15.         public static String HouseHoldTable=null;
  16.         public static String HouseHoldTable_ColumnQualifier=null;
  17.         public static String HouseHoldTable_ColumnFamily=null;
  18.         static HashSet<Long> set = new HashSet<Long>();
  19.     /**
  20.    * Implements the Mapper that reads the data and extracts the
  21.    * required information.
  22.    */
  23.   static class AnalyzeMapper extends TableMapper<Text, IntWritable>  {
  24.  
  25.       private JSONParser parser = new JSONParser();
  26.       private IntWritable ONE = new IntWritable(1);
  27.       HashSet<String> hs = new HashSet<String>();
  28.       public static byte[] name, duration,start,genre,type;
  29.       static String start_string;
  30.         static String[] date;
  31.         static String act_date;
  32.         static String time;
  33.       public static String return_value;
  34.       String mapper_output,reducer_input;
  35.       private Text text = new Text();
  36.      
  37.     /**
  38.      * Maps the input. Join of 3 HBase Tables.
  39.      *
  40.      * @param row The row key.
  41.      * @param columns The columns of the row.
  42.      * @param context The task context.
  43.      * @throws java.io.IOException When mapping the input fails.
  44.      */
  45.     // vv AnalyzeData
  46.     @Override
  47.     public void map(ImmutableBytesWritable row, Result columns, Context context)
  48.     throws IOException {
  49.         try {
  50.             //getHouseHoldID();
  51.             for (KeyValue kv : columns.list()) {
  52.                 String value = Bytes.toStringBinary(kv.getValue());
  53.                 //hs.add(Bytes.toStringBinary(kv.getValue()));
  54.                
  55.                 /* parsing the json for contentID and HouseholdID*/
  56.                
  57.                 JSONObject jsonObj = (JSONObject) parser.parse(value);
  58.                 JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID
  59.                 Long contentId = (Long) payload.get("contentId"); //get the contentId
  60.                 String channelId = (String) payload.get("channelId"); //channelId
  61.                 String personId = (String) payload.get("personId"); //personId
  62.                 String contentID = String.valueOf(contentId); //cast it to a string
  63.                 String televisionID = (String) payload.get("televisionId"); //televisionId
  64.                 JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID
  65.                 String HouseHoldId = (String) header.get("identifier"); //get the householdID
  66.                 long timestamp = (Long) header.get("timestamp");
  67.                
  68.                 /* end parsing of contentID and HouseholdID*/
  69.                
  70.                 //if parsing is not needed, send the variable 'value' directly to contentjoin method
  71.                
  72.                 mapper_output = contentidxjoin(contentID);
  73.                 reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp;
  74.                 if(reducer_input!=null) {
  75.                 text.set(reducer_input);
  76.                 //System.out.println("Reducer input is " + reducer_input);
  77.                 context.write(text,ONE);
  78.                 }
  79.             }
  80.         }
  81.         catch (Exception e) {
  82.             e.printStackTrace();
  83.         }
  84.   }
  85.     public static String contentidxjoin(String contentId) {
  86.           Configuration conf = HBaseConfiguration.create();
  87.           conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  88.           @SuppressWarnings("resource")
  89.         HTable table;
  90.         try {
  91.             table = new HTable(conf, ContentidxTable);
  92.             if(table!= null) {
  93.             Get get1 = new Get(Bytes.toBytes(contentId));
  94.             get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier));
  95.             Result result1 = table.get(get1);
  96.             byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily),
  97.                   Bytes.toBytes(ContentidxTable_ColumnQualifier));
  98.           //System.out.println("Value: " + Bytes.toString(val1));
  99.             return_value = contentjoin(Bytes.toString(val1),contentId);
  100.             }
  101.         }
  102.        
  103.         catch (Exception e) {
  104.             e.printStackTrace();
  105.         }
  106.         return return_value;
  107.        
  108.       }
  109.     public static String contentjoin(String value,String contentID) {
  110.        
  111.           Configuration conf = HBaseConfiguration.create();
  112.         conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  113.           @SuppressWarnings("resource")
  114.         HTable table;
  115.         try {
  116.               table = new HTable(conf, ContentTable);
  117.               if(table!=null) {
  118.               Get get1 = new Get(Bytes.toBytes(value));
  119.               get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily));
  120.               Result result1 = table.get(get1);
  121.              
  122.               //print out the name
  123.               name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  124.                       Bytes.toBytes("name"));
  125.               //print out the duration
  126.               duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  127.                       Bytes.toBytes("duration"));
  128.               //print out start time
  129.               start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  130.                       Bytes.toBytes("start"));
  131.               start_string = Bytes.toString(start);
  132.               date = start_string.split("T");
  133.               act_date = date[0];
  134.               time = date[1];
  135.               time = time.replace("Z", "");
  136.               //print out genre
  137.               genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  138.                       Bytes.toBytes("genre"));
  139.             //print out type
  140.               type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  141.                       Bytes.toBytes("type"));
  142.               }
  143.         }
  144.         catch (Exception e) {
  145.             e.printStackTrace();
  146.         }
  147.         return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
  148.         //return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
  149.     }
  150.   }
  151.   /*
  152.    * Reducer to implement GroupBY of the mapper output and dump the result to a file.
  153.   */  
  154. public static class AnalyzeReducerTable  
  155.   extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  156.       private JSONParser parser = new JSONParser();
  157.       public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes();
  158.       public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes();
  159.       int count = 0;
  160.       String[] splitStr;
  161.       public static final byte[] CF = OutputTable_ColumnFamily.getBytes();
  162.       int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24};
  163.       public static final byte[] a = OutputTable_ColumnQualifier.getBytes();
  164.       int key_row_count = 1;
  165.       int row_count = 1;
  166.       String current_HouseHoldId="";
  167.    
  168.       @Override
  169.     protected void reduce(Text key, Iterable<IntWritable> values,
  170.       Context context) throws IOException, InterruptedException {
  171.       Configuration HouseHold_conf = HBaseConfiguration.create();
  172.     HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  173.       @SuppressWarnings("resource")
  174.       HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable);
  175.    
  176.       String column_family = "";
  177.       int min=0,max=0;
  178.       JSONObject obj = new JSONObject();
  179.       JSONObject obj1 = new JSONObject();
  180.       JSONObject start_end_time = new JSONObject();
  181.       try{
  182.           for (IntWritable one : values) count++;
  183.           String base = key.toString();
  184.           if(base!=null) {
  185.           splitStr = base.split("\\s+");
  186.           System.out.println(key.toString());
  187.           if(!(current_HouseHoldId.equals(splitStr[0]))) {
  188.               set.clear();
  189.           }
  190.          
  191.           SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa");
  192.           sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC"));
  193.           String date1 = sdf.format(new java.util.Date (unix_time));
  194.           //System.out.println(unix_time/1000);
  195.           String[] time_split = date1.split(" ");
  196.           String[] time_bin  = time_split[2].split(":");
  197.           int hr = Integer.parseInt(time_bin[0]);
  198.           for(int i=0;i<time_slot.length;i++){
  199.               if(time_slot[i]<=hr && time_slot[i+1]>=hr){
  200.                   min = time_slot[i];
  201.                   max = time_slot[i+1];
  202.               }
  203.           }
  204.           String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
  205.           String row_key;
  206.           row_key = splitStr[0] +"-"+ (hr_bin);
  207.          
  208.           //byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte;
  209.           byte[] start_time=null;
  210.           //get operation
  211.           Get get = new Get(Bytes.toBytes(splitStr[0]));
  212.           get.addColumn(HouseHold_CF, HouseHold_a);
  213.           Result result1 = HBase_HouseHoldtable.get(get);
  214.           if(result1!= null) {
  215.           start_time = result1.getValue(HouseHold_CF,
  216.                   HouseHold_a);
  217.           }
  218.           if(Bytes.toString(start_time) != null) {
  219.               //System.out.println(Bytes.toString(start_time));
  220.           Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
  221.           String value1 = Bytes.toString(start_time);
  222.           JSONObject jsonObj = (JSONObject) parser.parse(value1);
  223.             long start_actual_time = (Long) jsonObj.get("start_time");
  224.             try {
  225.                 long end_actual_time = (Long) jsonObj.get("end_time");
  226.                 set.add(end_actual_time);    
  227.                 System.out.println("End time" + end_actual_time);
  228.                 }
  229.             catch(Exception e) {
  230.                
  231.             }
  232.             System.out.println("Start time" + start_actual_time);
  233.             set.add(start_actual_time);
  234.             set.add(hr_bin);
  235.             //myList.add(hr_bin);
  236.             Object array_min = Collections.min(set);
  237.             Object array_max = Collections.max(set);
  238.             if(!(array_min.equals(array_max))) {
  239.                 start_end_time.put("start_time", array_min);
  240.                 start_end_time.put("end_time", array_max);
  241.             } else {
  242.                 start_end_time.put("start_time", array_min);
  243.             }
  244.               HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
  245.               HBase_HouseHoldtable.put(HouseHoldput);
  246.           } else {
  247.               Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
  248.                   //start_end_time.put("start_time", Bytes.toString(start_time));
  249.                   start_end_time.put("start_time", hr_bin);
  250.                   //myList.add((long)hr_bin);
  251.                   HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
  252.                   HBase_HouseHoldtable.put(HouseHoldput);
  253.                   set.clear();
  254.           }
  255.          
  256.           obj1.put("contentId", splitStr[1]);
  257.           obj1.put("name", splitStr[2]);
  258.           obj1.put("duration", splitStr[3]);
  259.           obj1.put("content_date", splitStr[4]);
  260.           obj1.put("content_time", splitStr[5]);
  261.           obj1.put("genre", splitStr[6]);
  262.           obj1.put("type", splitStr[7]);
  263.           obj1.put("channelId", splitStr[8]);
  264.           obj1.put("session_time_bin", session_time_bin);
  265.           obj1.put("personId", splitStr[9]);
  266.           obj1.put("televisionId", splitStr[10]);
  267.           obj1.put("timestamp", date1);
  268.           obj.put(splitStr[0], obj1 );
  269.           Put put = new Put(Bytes.toBytes(row_key));
  270.           column_family = "" + (min_diff*60);
  271.           put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table
  272.           context.write(null, put);
  273.           }
  274.       }
  275.       catch(Exception e) {
  276.           e.printStackTrace();
  277.       }
  278.     }
  279.   }
  280.  
  281.   public static class AnalyzeReducerFile
  282.   extends Reducer<Text, IntWritable, Text, IntWritable> {
  283.       private Text output_text = new Text();
  284.       String[] splitStr;
  285.       int[] time_slot = {2,4,7,9,12,14,17,19,21,24};
  286.       public static final byte[] CF = "cf".getBytes();
  287.     @SuppressWarnings("unchecked")
  288.     @Override
  289.     protected void reduce(Text key, Iterable<IntWritable> values,
  290.       Context context) throws IOException, InterruptedException {
  291.       int count = 0;
  292.       String column_family = "";
  293.       int min=0,max=0;
  294.       JSONObject obj = new JSONObject();
  295.       JSONObject obj1 = new JSONObject();
  296.       try {
  297.           for (IntWritable one : values) count++;
  298.           String base = key.toString();
  299.           if(base!=null) {
  300.           System.out.println(base);
  301.           splitStr = base.split("\\s+");
  302.           String[] time_split = splitStr[5].split(":");
  303.           int hr = Integer.parseInt(time_split[0]);
  304.           for(int i=0;i<time_slot.length;i++){
  305.               if(time_slot[i]<=hr && time_slot[i+1]>=hr){
  306.                   min = time_slot[i];
  307.                   max = time_slot[i+1];
  308.               }
  309.           }
  310.           String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
  311.           long unix_time = Long.parseLong(splitStr[11]);
  312.           long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600));
  313.           String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000));
  314.           String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time));
  315.           int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin);
  316.           String row_key = splitStr[0] +"-"+ (hr_bin);
  317.           System.out.println("Unix Time" + unix_time/1000);
  318.           System.out.println("HR_BIN" + hr_bin);
  319.           String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time));
  320.           obj1.put("contentId", splitStr[1]);
  321.           obj1.put("name", splitStr[2]);
  322.           obj1.put("duration", splitStr[3]);
  323.           obj1.put("actual_time", splitStr[5]);
  324.           obj1.put("genre", splitStr[6]);
  325.           obj1.put("type", splitStr[7]);
  326.           obj1.put("channelId", splitStr[8]);
  327.           obj1.put("session_time_bin", session_time_bin);
  328.           obj1.put("personId", splitStr[9]);
  329.           obj1.put("televisionId", splitStr[10]);
  330.           obj1.put("timestamp", date1);
  331.           obj.put(splitStr[0], obj1 );
  332.           output_text.set(obj.toString());
  333.           context.write(output_text, new IntWritable(count));
  334.           }
  335.       }  
  336.       catch(Exception e) {
  337.           e.printStackTrace();
  338.       }
  339.     }
  340.   }
  341.   public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException {
  342.      
  343.       Properties prop = new Properties();
  344.       int count = 0;
  345.      
  346.      
  347.     try {
  348.             //load a properties file from class path, inside static method
  349.             prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties"));    
  350.             //get the property value and print it out
  351.             ViewersTable=prop.getProperty("ViewersTable");
  352.             ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily");
  353.             ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier");
  354.             ContentidxTable=prop.getProperty("ContentidxTable");
  355.             ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily");
  356.             ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier");
  357.             ContentTable=prop.getProperty("ContentTable");
  358.             ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily");
  359.             ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier");
  360.             OutputTable=prop.getProperty("OutputTable");
  361.             OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily");
  362.             OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier");
  363.             HouseHoldTable=prop.getProperty("HouseHoldTable");
  364.             HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily");
  365.             HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier");
  366.  
  367.     } catch (IOException ex) {
  368.         throw new IOException("Config file issue");
  369.       }
  370.     Configuration conf = HBaseConfiguration.create();
  371.     conf.set("hbase.zookeeper.quorum", "10.10.100.170");
  372.     Job job = new Job(conf, ViewersTable);
  373.     if(job!=null) {
  374.     job.setJarByClass(SampleJob.class);     // class that contains mapper
  375.  
  376.     Scan scan = new Scan();
  377.     scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier));
  378.   //scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l"));
  379.     String option = "t";
  380.     TableMapReduceUtil.initTableMapperJob(
  381.       ViewersTable,        // input HBase table name
  382.       scan,             // Scan instance to control CF and attribute selection
  383.       AnalyzeMapper.class,   // mapper
  384.       Text.class,             // mapper output key
  385.       IntWritable.class,             // mapper output value
  386.       job);
  387.     if(option.equals("t")) {
  388.     TableMapReduceUtil.initTableReducerJob(
  389.             OutputTable,        // output table
  390.             AnalyzeReducerTable.class,    // reducer class
  391.             job);
  392.     job.setNumReduceTasks(1);  
  393.     }
  394.     else {
  395.         throw new IOException("Not a valid command line option.");
  396.     }
  397.     boolean b = job.waitForCompletion(true);
  398.     set.clear();
  399.    // boolean b = true;
  400.     if (!b) {
  401.       throw new IOException("error with job!");
  402.     }
  403.     else {
  404.     System.out.println("Job is completed!");
  405.         }
  406.     }
  407.   }
  408. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement