public class SampleJob { public static String ViewersTable=null; public static String ViewersTable_ColumnFamily=null; public static String ViewersTable_ColumnQualifier=null; public static String ContentidxTable=null; public static String ContentidxTable_ColumnFamily=null; public static String ContentidxTable_ColumnQualifier=null; public static String ContentTable=null; public static String ContentTable_ColumnFamily=null; public static String ContentTable_ColumnQualifier=null; public static String OutputTable=null; public static String OutputTable_ColumnQualifier=null; public static String OutputTable_ColumnFamily=null; public static String HouseHoldTable=null; public static String HouseHoldTable_ColumnQualifier=null; public static String HouseHoldTable_ColumnFamily=null; static HashSet set = new HashSet(); /** * Implements the Mapper that reads the data and extracts the * required information. */ static class AnalyzeMapper extends TableMapper { private JSONParser parser = new JSONParser(); private IntWritable ONE = new IntWritable(1); HashSet hs = new HashSet(); public static byte[] name, duration,start,genre,type; static String start_string; static String[] date; static String act_date; static String time; public static String return_value; String mapper_output,reducer_input; private Text text = new Text(); /** * Maps the input. Join of 3 HBase Tables. * * @param row The row key. * @param columns The columns of the row. * @param context The task context. * @throws java.io.IOException When mapping the input fails. */ // vv AnalyzeData @Override public void map(ImmutableBytesWritable row, Result columns, Context context) throws IOException { try { //getHouseHoldID(); for (KeyValue kv : columns.list()) { String value = Bytes.toStringBinary(kv.getValue()); //hs.add(Bytes.toStringBinary(kv.getValue())); /* parsing the json for contentID and HouseholdID*/ JSONObject jsonObj = (JSONObject) parser.parse(value); JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID Long contentId = (Long) payload.get("contentId"); //get the contentId String channelId = (String) payload.get("channelId"); //channelId String personId = (String) payload.get("personId"); //personId String contentID = String.valueOf(contentId); //cast it to a string String televisionID = (String) payload.get("televisionId"); //televisionId JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID String HouseHoldId = (String) header.get("identifier"); //get the householdID long timestamp = (Long) header.get("timestamp"); /* end parsing of contentID and HouseholdID*/ //if parsing is not needed, send the variable 'value' directly to contentjoin method mapper_output = contentidxjoin(contentID); reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp; if(reducer_input!=null) { text.set(reducer_input); //System.out.println("Reducer input is " + reducer_input); context.write(text,ONE); } } } catch (Exception e) { e.printStackTrace(); } } public static String contentidxjoin(String contentId) { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "10.10.100.187"); @SuppressWarnings("resource") HTable table; try { table = new HTable(conf, ContentidxTable); if(table!= null) { Get get1 = new Get(Bytes.toBytes(contentId)); get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier)); Result result1 = table.get(get1); byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier)); //System.out.println("Value: " + Bytes.toString(val1)); return_value = contentjoin(Bytes.toString(val1),contentId); } } catch (Exception e) { e.printStackTrace(); } return return_value; } public static String contentjoin(String value,String contentID) { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "10.10.100.187"); @SuppressWarnings("resource") HTable table; try { table = new HTable(conf, ContentTable); if(table!=null) { Get get1 = new Get(Bytes.toBytes(value)); get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily)); Result result1 = table.get(get1); //print out the name name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily), Bytes.toBytes("name")); //print out the duration duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily), Bytes.toBytes("duration")); //print out start time start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily), Bytes.toBytes("start")); start_string = Bytes.toString(start); date = start_string.split("T"); act_date = date[0]; time = date[1]; time = time.replace("Z", ""); //print out genre genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily), Bytes.toBytes("genre")); //print out type type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily), Bytes.toBytes("type")); } } catch (Exception e) { e.printStackTrace(); } return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type); //return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type); } } /* * Reducer to implement GroupBY of the mapper output and dump the result to a file. */ public static class AnalyzeReducerTable extends TableReducer { private JSONParser parser = new JSONParser(); public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes(); public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes(); int count = 0; String[] splitStr; public static final byte[] CF = OutputTable_ColumnFamily.getBytes(); int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24}; public static final byte[] a = OutputTable_ColumnQualifier.getBytes(); int key_row_count = 1; int row_count = 1; String current_HouseHoldId=""; @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Configuration HouseHold_conf = HBaseConfiguration.create(); HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187"); @SuppressWarnings("resource") HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable); String column_family = ""; int min=0,max=0; JSONObject obj = new JSONObject(); JSONObject obj1 = new JSONObject(); JSONObject start_end_time = new JSONObject(); try{ for (IntWritable one : values) count++; String base = key.toString(); if(base!=null) { splitStr = base.split("\\s+"); System.out.println(key.toString()); if(!(current_HouseHoldId.equals(splitStr[0]))) { set.clear(); } SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa"); sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC")); String date1 = sdf.format(new java.util.Date (unix_time)); //System.out.println(unix_time/1000); String[] time_split = date1.split(" "); String[] time_bin = time_split[2].split(":"); int hr = Integer.parseInt(time_bin[0]); for(int i=0;i=hr){ min = time_slot[i]; max = time_slot[i+1]; } } String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max); String row_key; row_key = splitStr[0] +"-"+ (hr_bin); //byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte; byte[] start_time=null; //get operation Get get = new Get(Bytes.toBytes(splitStr[0])); get.addColumn(HouseHold_CF, HouseHold_a); Result result1 = HBase_HouseHoldtable.get(get); if(result1!= null) { start_time = result1.getValue(HouseHold_CF, HouseHold_a); } if(Bytes.toString(start_time) != null) { //System.out.println(Bytes.toString(start_time)); Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0])); String value1 = Bytes.toString(start_time); JSONObject jsonObj = (JSONObject) parser.parse(value1); long start_actual_time = (Long) jsonObj.get("start_time"); try { long end_actual_time = (Long) jsonObj.get("end_time"); set.add(end_actual_time); System.out.println("End time" + end_actual_time); } catch(Exception e) { } System.out.println("Start time" + start_actual_time); set.add(start_actual_time); set.add(hr_bin); //myList.add(hr_bin); Object array_min = Collections.min(set); Object array_max = Collections.max(set); if(!(array_min.equals(array_max))) { start_end_time.put("start_time", array_min); start_end_time.put("end_time", array_max); } else { start_end_time.put("start_time", array_min); } HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString())); HBase_HouseHoldtable.put(HouseHoldput); } else { Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0])); //start_end_time.put("start_time", Bytes.toString(start_time)); start_end_time.put("start_time", hr_bin); //myList.add((long)hr_bin); HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString())); HBase_HouseHoldtable.put(HouseHoldput); set.clear(); } obj1.put("contentId", splitStr[1]); obj1.put("name", splitStr[2]); obj1.put("duration", splitStr[3]); obj1.put("content_date", splitStr[4]); obj1.put("content_time", splitStr[5]); obj1.put("genre", splitStr[6]); obj1.put("type", splitStr[7]); obj1.put("channelId", splitStr[8]); obj1.put("session_time_bin", session_time_bin); obj1.put("personId", splitStr[9]); obj1.put("televisionId", splitStr[10]); obj1.put("timestamp", date1); obj.put(splitStr[0], obj1 ); Put put = new Put(Bytes.toBytes(row_key)); column_family = "" + (min_diff*60); put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table context.write(null, put); } } catch(Exception e) { e.printStackTrace(); } } } public static class AnalyzeReducerFile extends Reducer { private Text output_text = new Text(); String[] splitStr; int[] time_slot = {2,4,7,9,12,14,17,19,21,24}; public static final byte[] CF = "cf".getBytes(); @SuppressWarnings("unchecked") @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count = 0; String column_family = ""; int min=0,max=0; JSONObject obj = new JSONObject(); JSONObject obj1 = new JSONObject(); try { for (IntWritable one : values) count++; String base = key.toString(); if(base!=null) { System.out.println(base); splitStr = base.split("\\s+"); String[] time_split = splitStr[5].split(":"); int hr = Integer.parseInt(time_split[0]); for(int i=0;i=hr){ min = time_slot[i]; max = time_slot[i+1]; } } String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max); long unix_time = Long.parseLong(splitStr[11]); long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600)); String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000)); String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time)); int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin); String row_key = splitStr[0] +"-"+ (hr_bin); System.out.println("Unix Time" + unix_time/1000); System.out.println("HR_BIN" + hr_bin); String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time)); obj1.put("contentId", splitStr[1]); obj1.put("name", splitStr[2]); obj1.put("duration", splitStr[3]); obj1.put("actual_time", splitStr[5]); obj1.put("genre", splitStr[6]); obj1.put("type", splitStr[7]); obj1.put("channelId", splitStr[8]); obj1.put("session_time_bin", session_time_bin); obj1.put("personId", splitStr[9]); obj1.put("televisionId", splitStr[10]); obj1.put("timestamp", date1); obj.put(splitStr[0], obj1 ); output_text.set(obj.toString()); context.write(output_text, new IntWritable(count)); } } catch(Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException { Properties prop = new Properties(); int count = 0; try { //load a properties file from class path, inside static method prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties")); //get the property value and print it out ViewersTable=prop.getProperty("ViewersTable"); ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily"); ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier"); ContentidxTable=prop.getProperty("ContentidxTable"); ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily"); ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier"); ContentTable=prop.getProperty("ContentTable"); ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily"); ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier"); OutputTable=prop.getProperty("OutputTable"); OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily"); OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier"); HouseHoldTable=prop.getProperty("HouseHoldTable"); HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily"); HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier"); } catch (IOException ex) { throw new IOException("Config file issue"); } Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "10.10.100.170"); Job job = new Job(conf, ViewersTable); if(job!=null) { job.setJarByClass(SampleJob.class); // class that contains mapper Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier)); //scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l")); String option = "t"; TableMapReduceUtil.initTableMapperJob( ViewersTable, // input HBase table name scan, // Scan instance to control CF and attribute selection AnalyzeMapper.class, // mapper Text.class, // mapper output key IntWritable.class, // mapper output value job); if(option.equals("t")) { TableMapReduceUtil.initTableReducerJob( OutputTable, // output table AnalyzeReducerTable.class, // reducer class job); job.setNumReduceTasks(1); } else { throw new IOException("Not a valid command line option."); } boolean b = job.waitForCompletion(true); set.clear(); // boolean b = true; if (!b) { throw new IOException("error with job!"); } else { System.out.println("Job is completed!"); } } } }