Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Untitled

By: pavan0591 on Aug 25th, 2013  |  syntax: Java  |  size: 16.99 KB  |  views: 42  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. public class SampleJob {
  2.        
  3.                 public static String ViewersTable=null;
  4.                 public static String ViewersTable_ColumnFamily=null;
  5.                 public static String ViewersTable_ColumnQualifier=null;
  6.                 public static String ContentidxTable=null;
  7.                 public static String ContentidxTable_ColumnFamily=null;
  8.                 public static String ContentidxTable_ColumnQualifier=null;
  9.                 public static String ContentTable=null;
  10.                 public static String ContentTable_ColumnFamily=null;
  11.                 public static String ContentTable_ColumnQualifier=null;
  12.                 public static String OutputTable=null;
  13.                 public static String OutputTable_ColumnQualifier=null;
  14.                 public static String OutputTable_ColumnFamily=null;
  15.                 public static String HouseHoldTable=null;
  16.                 public static String HouseHoldTable_ColumnQualifier=null;
  17.                 public static String HouseHoldTable_ColumnFamily=null;
  18.                 static HashSet<Long> set = new HashSet<Long>();
  19.         /**
  20.    * Implements the Mapper that reads the data and extracts the
  21.    * required information.
  22.    */
  23.   static class AnalyzeMapper extends TableMapper<Text, IntWritable>  {
  24.  
  25.           private JSONParser parser = new JSONParser();
  26.           private IntWritable ONE = new IntWritable(1);
  27.           HashSet<String> hs = new HashSet<String>();
  28.           public static byte[] name, duration,start,genre,type;
  29.           static String start_string;
  30.                 static String[] date;
  31.                 static String act_date;
  32.                 static String time;
  33.           public static String return_value;
  34.           String mapper_output,reducer_input;
  35.           private Text text = new Text();
  36.          
  37.     /**
  38.      * Maps the input. Join of 3 HBase Tables.
  39.      *
  40.      * @param row The row key.
  41.      * @param columns The columns of the row.
  42.      * @param context The task context.
  43.      * @throws java.io.IOException When mapping the input fails.
  44.      */
  45.     // vv AnalyzeData
  46.     @Override
  47.     public void map(ImmutableBytesWritable row, Result columns, Context context)
  48.     throws IOException {
  49.         try {
  50.                 //getHouseHoldID();
  51.             for (KeyValue kv : columns.list()) {
  52.                 String value = Bytes.toStringBinary(kv.getValue());
  53.                 //hs.add(Bytes.toStringBinary(kv.getValue()));
  54.                
  55.                 /* parsing the json for contentID and HouseholdID*/
  56.                
  57.                 JSONObject jsonObj = (JSONObject) parser.parse(value);
  58.                 JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID
  59.                 Long contentId = (Long) payload.get("contentId"); //get the contentId
  60.                 String channelId = (String) payload.get("channelId"); //channelId
  61.                 String personId = (String) payload.get("personId"); //personId
  62.                 String contentID = String.valueOf(contentId); //cast it to a string
  63.                 String televisionID = (String) payload.get("televisionId"); //televisionId
  64.                 JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID
  65.                 String HouseHoldId = (String) header.get("identifier"); //get the householdID
  66.                 long timestamp = (Long) header.get("timestamp");
  67.                
  68.                 /* end parsing of contentID and HouseholdID*/
  69.                
  70.                 //if parsing is not needed, send the variable 'value' directly to contentjoin method
  71.                
  72.                 mapper_output = contentidxjoin(contentID);
  73.                 reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp;
  74.                 if(reducer_input!=null) {
  75.                 text.set(reducer_input);
  76.                 //System.out.println("Reducer input is " + reducer_input);
  77.                 context.write(text,ONE);
  78.                 }
  79.             }
  80.         }
  81.         catch (Exception e) {
  82.             e.printStackTrace();
  83.         }
  84.   }
  85.         public static String contentidxjoin(String contentId) {
  86.                   Configuration conf = HBaseConfiguration.create();
  87.                   conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  88.                   @SuppressWarnings("resource")
  89.                 HTable table;
  90.                 try {
  91.                         table = new HTable(conf, ContentidxTable);
  92.                         if(table!= null) {
  93.                         Get get1 = new Get(Bytes.toBytes(contentId));
  94.                         get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier));
  95.                         Result result1 = table.get(get1);
  96.                         byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily),
  97.                               Bytes.toBytes(ContentidxTable_ColumnQualifier));
  98.                   //System.out.println("Value: " + Bytes.toString(val1));
  99.                         return_value = contentjoin(Bytes.toString(val1),contentId);
  100.                         }
  101.                 }
  102.                
  103.                 catch (Exception e) {
  104.                         e.printStackTrace();
  105.                 }
  106.                 return return_value;
  107.                
  108.           }
  109.     public static String contentjoin(String value,String contentID) {
  110.        
  111.                   Configuration conf = HBaseConfiguration.create();
  112.                 conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  113.                   @SuppressWarnings("resource")
  114.                 HTable table;
  115.                 try {
  116.                           table = new HTable(conf, ContentTable);
  117.                           if(table!=null) {
  118.                           Get get1 = new Get(Bytes.toBytes(value));
  119.                           get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily));
  120.                           Result result1 = table.get(get1);
  121.                          
  122.                           //print out the name
  123.                           name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  124.                                       Bytes.toBytes("name"));
  125.                           //print out the duration
  126.                           duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  127.                                       Bytes.toBytes("duration"));
  128.                           //print out start time
  129.                           start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  130.                                       Bytes.toBytes("start"));
  131.                           start_string = Bytes.toString(start);
  132.                           date = start_string.split("T");
  133.                           act_date = date[0];
  134.                           time = date[1];
  135.                           time = time.replace("Z", "");
  136.                           //print out genre
  137.                           genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  138.                                       Bytes.toBytes("genre"));
  139.                         //print out type
  140.                           type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
  141.                                       Bytes.toBytes("type"));
  142.                           }
  143.                 }
  144.                 catch (Exception e) {
  145.                         e.printStackTrace();
  146.                 }
  147.                 return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
  148.                 //return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
  149.     }
  150.   }
  151.   /*
  152.    * Reducer to implement GroupBY of the mapper output and dump the result to a file.
  153.   */  
  154. public static class AnalyzeReducerTable  
  155.   extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  156.           private JSONParser parser = new JSONParser();
  157.           public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes();
  158.           public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes();
  159.           int count = 0;
  160.           String[] splitStr;
  161.           public static final byte[] CF = OutputTable_ColumnFamily.getBytes();
  162.           int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24};
  163.           public static final byte[] a = OutputTable_ColumnQualifier.getBytes();
  164.           int key_row_count = 1;
  165.           int row_count = 1;
  166.           String current_HouseHoldId="";
  167.    
  168.           @Override
  169.     protected void reduce(Text key, Iterable<IntWritable> values,
  170.       Context context) throws IOException, InterruptedException {
  171.       Configuration HouseHold_conf = HBaseConfiguration.create();
  172.         HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187");
  173.           @SuppressWarnings("resource")
  174.           HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable);
  175.        
  176.       String column_family = "";
  177.       int min=0,max=0;
  178.       JSONObject obj = new JSONObject();
  179.       JSONObject obj1 = new JSONObject();
  180.       JSONObject start_end_time = new JSONObject();
  181.       try{
  182.               for (IntWritable one : values) count++;
  183.               String base = key.toString();
  184.               if(base!=null) {
  185.               splitStr = base.split("\\s+");
  186.               System.out.println(key.toString());
  187.               if(!(current_HouseHoldId.equals(splitStr[0]))) {
  188.                   set.clear();
  189.               }
  190.              
  191.               SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa");
  192.               sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC"));
  193.               String date1 = sdf.format(new java.util.Date (unix_time));
  194.               //System.out.println(unix_time/1000);
  195.               String[] time_split = date1.split(" ");
  196.               String[] time_bin  = time_split[2].split(":");
  197.               int hr = Integer.parseInt(time_bin[0]);
  198.               for(int i=0;i<time_slot.length;i++){
  199.                   if(time_slot[i]<=hr && time_slot[i+1]>=hr){
  200.                           min = time_slot[i];
  201.                           max = time_slot[i+1];
  202.                   }
  203.               }
  204.               String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
  205.               String row_key;
  206.               row_key = splitStr[0] +"-"+ (hr_bin);
  207.              
  208.               //byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte;
  209.               byte[] start_time=null;
  210.               //get operation
  211.               Get get = new Get(Bytes.toBytes(splitStr[0]));
  212.               get.addColumn(HouseHold_CF, HouseHold_a);
  213.               Result result1 = HBase_HouseHoldtable.get(get);
  214.               if(result1!= null) {
  215.               start_time = result1.getValue(HouseHold_CF,
  216.                               HouseHold_a);
  217.               }
  218.               if(Bytes.toString(start_time) != null) {
  219.                   //System.out.println(Bytes.toString(start_time));
  220.               Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
  221.               String value1 = Bytes.toString(start_time);
  222.               JSONObject jsonObj = (JSONObject) parser.parse(value1);
  223.                 long start_actual_time = (Long) jsonObj.get("start_time");
  224.                 try {
  225.                         long end_actual_time = (Long) jsonObj.get("end_time");
  226.                         set.add(end_actual_time);    
  227.                         System.out.println("End time" + end_actual_time);
  228.                         }
  229.                 catch(Exception e) {
  230.                        
  231.                 }
  232.                 System.out.println("Start time" + start_actual_time);
  233.                 set.add(start_actual_time);
  234.                 set.add(hr_bin);
  235.                 //myList.add(hr_bin);
  236.                 Object array_min = Collections.min(set);
  237.                 Object array_max = Collections.max(set);
  238.                 if(!(array_min.equals(array_max))) {
  239.                         start_end_time.put("start_time", array_min);
  240.                         start_end_time.put("end_time", array_max);
  241.                 } else {
  242.                         start_end_time.put("start_time", array_min);
  243.                 }
  244.                   HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
  245.                       HBase_HouseHoldtable.put(HouseHoldput);
  246.               } else {
  247.                   Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
  248.                           //start_end_time.put("start_time", Bytes.toString(start_time));
  249.                           start_end_time.put("start_time", hr_bin);
  250.                           //myList.add((long)hr_bin);
  251.                           HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
  252.                               HBase_HouseHoldtable.put(HouseHoldput);
  253.                               set.clear();
  254.               }
  255.              
  256.               obj1.put("contentId", splitStr[1]);
  257.               obj1.put("name", splitStr[2]);
  258.               obj1.put("duration", splitStr[3]);
  259.               obj1.put("content_date", splitStr[4]);
  260.               obj1.put("content_time", splitStr[5]);
  261.               obj1.put("genre", splitStr[6]);
  262.               obj1.put("type", splitStr[7]);
  263.               obj1.put("channelId", splitStr[8]);
  264.               obj1.put("session_time_bin", session_time_bin);
  265.               obj1.put("personId", splitStr[9]);
  266.               obj1.put("televisionId", splitStr[10]);
  267.               obj1.put("timestamp", date1);
  268.               obj.put(splitStr[0], obj1 );
  269.               Put put = new Put(Bytes.toBytes(row_key));
  270.               column_family = "" + (min_diff*60);
  271.               put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table
  272.               context.write(null, put);
  273.               }
  274.       }
  275.       catch(Exception e) {
  276.           e.printStackTrace();
  277.       }
  278.     }
  279.   }
  280.  
  281.   public static class AnalyzeReducerFile
  282.   extends Reducer<Text, IntWritable, Text, IntWritable> {
  283.           private Text output_text = new Text();
  284.           String[] splitStr;
  285.           int[] time_slot = {2,4,7,9,12,14,17,19,21,24};
  286.           public static final byte[] CF = "cf".getBytes();
  287.     @SuppressWarnings("unchecked")
  288.         @Override
  289.     protected void reduce(Text key, Iterable<IntWritable> values,
  290.       Context context) throws IOException, InterruptedException {
  291.       int count = 0;
  292.       String column_family = "";
  293.       int min=0,max=0;
  294.       JSONObject obj = new JSONObject();
  295.       JSONObject obj1 = new JSONObject();
  296.       try {
  297.               for (IntWritable one : values) count++;
  298.               String base = key.toString();
  299.               if(base!=null) {
  300.               System.out.println(base);
  301.               splitStr = base.split("\\s+");
  302.               String[] time_split = splitStr[5].split(":");
  303.               int hr = Integer.parseInt(time_split[0]);
  304.               for(int i=0;i<time_slot.length;i++){
  305.                   if(time_slot[i]<=hr && time_slot[i+1]>=hr){
  306.                           min = time_slot[i];
  307.                           max = time_slot[i+1];
  308.                   }
  309.               }
  310.               String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
  311.               long unix_time = Long.parseLong(splitStr[11]);
  312.               long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600));
  313.               String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000));
  314.               String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time));
  315.               int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin);
  316.               String row_key = splitStr[0] +"-"+ (hr_bin);
  317.               System.out.println("Unix Time" + unix_time/1000);
  318.               System.out.println("HR_BIN" + hr_bin);
  319.               String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time));
  320.               obj1.put("contentId", splitStr[1]);
  321.               obj1.put("name", splitStr[2]);
  322.               obj1.put("duration", splitStr[3]);
  323.               obj1.put("actual_time", splitStr[5]);
  324.               obj1.put("genre", splitStr[6]);
  325.               obj1.put("type", splitStr[7]);
  326.               obj1.put("channelId", splitStr[8]);
  327.               obj1.put("session_time_bin", session_time_bin);
  328.               obj1.put("personId", splitStr[9]);
  329.               obj1.put("televisionId", splitStr[10]);
  330.               obj1.put("timestamp", date1);
  331.               obj.put(splitStr[0], obj1 );
  332.               output_text.set(obj.toString());
  333.               context.write(output_text, new IntWritable(count));
  334.               }
  335.       }  
  336.       catch(Exception e) {
  337.           e.printStackTrace();
  338.       }
  339.     }
  340.   }
  341.   public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException {
  342.          
  343.           Properties prop = new Properties();
  344.           int count = 0;
  345.          
  346.          
  347.         try {
  348.                 //load a properties file from class path, inside static method
  349.                         prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties"));    
  350.                         //get the property value and print it out
  351.                         ViewersTable=prop.getProperty("ViewersTable");
  352.                         ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily");
  353.                         ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier");
  354.                         ContentidxTable=prop.getProperty("ContentidxTable");
  355.                         ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily");
  356.                         ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier");
  357.                         ContentTable=prop.getProperty("ContentTable");
  358.                         ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily");
  359.                         ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier");
  360.                         OutputTable=prop.getProperty("OutputTable");
  361.                         OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily");
  362.                         OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier");
  363.                         HouseHoldTable=prop.getProperty("HouseHoldTable");
  364.                         HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily");
  365.                         HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier");
  366.  
  367.         } catch (IOException ex) {
  368.                 throw new IOException("Config file issue");
  369.       }
  370.         Configuration conf = HBaseConfiguration.create();
  371.         conf.set("hbase.zookeeper.quorum", "10.10.100.170");
  372.     Job job = new Job(conf, ViewersTable);
  373.     if(job!=null) {
  374.     job.setJarByClass(SampleJob.class);     // class that contains mapper
  375.  
  376.     Scan scan = new Scan();
  377.     scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier));
  378.   //scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l"));
  379.     String option = "t";
  380.     TableMapReduceUtil.initTableMapperJob(
  381.       ViewersTable,        // input HBase table name
  382.       scan,             // Scan instance to control CF and attribute selection
  383.       AnalyzeMapper.class,   // mapper
  384.       Text.class,             // mapper output key
  385.       IntWritable.class,             // mapper output value
  386.       job);
  387.     if(option.equals("t")) {
  388.     TableMapReduceUtil.initTableReducerJob(
  389.                 OutputTable,        // output table
  390.                 AnalyzeReducerTable.class,    // reducer class
  391.                 job);
  392.     job.setNumReduceTasks(1);  
  393.     }
  394.     else {
  395.         throw new IOException("Not a valid command line option.");
  396.     }
  397.     boolean b = job.waitForCompletion(true);
  398.     set.clear();
  399.    // boolean b = true;
  400.     if (!b) {
  401.       throw new IOException("error with job!");
  402.     }
  403.     else {
  404.     System.out.println("Job is completed!");
  405.         }
  406.     }
  407.   }
  408. }