Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class SampleJob {
- public static String ViewersTable=null;
- public static String ViewersTable_ColumnFamily=null;
- public static String ViewersTable_ColumnQualifier=null;
- public static String ContentidxTable=null;
- public static String ContentidxTable_ColumnFamily=null;
- public static String ContentidxTable_ColumnQualifier=null;
- public static String ContentTable=null;
- public static String ContentTable_ColumnFamily=null;
- public static String ContentTable_ColumnQualifier=null;
- public static String OutputTable=null;
- public static String OutputTable_ColumnQualifier=null;
- public static String OutputTable_ColumnFamily=null;
- public static String HouseHoldTable=null;
- public static String HouseHoldTable_ColumnQualifier=null;
- public static String HouseHoldTable_ColumnFamily=null;
- static HashSet<Long> set = new HashSet<Long>();
- /**
- * Implements the Mapper that reads the data and extracts the
- * required information.
- */
- static class AnalyzeMapper extends TableMapper<Text, IntWritable> {
- private JSONParser parser = new JSONParser();
- private IntWritable ONE = new IntWritable(1);
- HashSet<String> hs = new HashSet<String>();
- public static byte[] name, duration,start,genre,type;
- static String start_string;
- static String[] date;
- static String act_date;
- static String time;
- public static String return_value;
- String mapper_output,reducer_input;
- private Text text = new Text();
- /**
- * Maps the input. Join of 3 HBase Tables.
- *
- * @param row The row key.
- * @param columns The columns of the row.
- * @param context The task context.
- * @throws java.io.IOException When mapping the input fails.
- */
- // vv AnalyzeData
- @Override
- public void map(ImmutableBytesWritable row, Result columns, Context context)
- throws IOException {
- try {
- //getHouseHoldID();
- for (KeyValue kv : columns.list()) {
- String value = Bytes.toStringBinary(kv.getValue());
- //hs.add(Bytes.toStringBinary(kv.getValue()));
- /* parsing the json for contentID and HouseholdID*/
- JSONObject jsonObj = (JSONObject) parser.parse(value);
- JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID
- Long contentId = (Long) payload.get("contentId"); //get the contentId
- String channelId = (String) payload.get("channelId"); //channelId
- String personId = (String) payload.get("personId"); //personId
- String contentID = String.valueOf(contentId); //cast it to a string
- String televisionID = (String) payload.get("televisionId"); //televisionId
- JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID
- String HouseHoldId = (String) header.get("identifier"); //get the householdID
- long timestamp = (Long) header.get("timestamp");
- /* end parsing of contentID and HouseholdID*/
- //if parsing is not needed, send the variable 'value' directly to contentjoin method
- mapper_output = contentidxjoin(contentID);
- reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp;
- if(reducer_input!=null) {
- text.set(reducer_input);
- //System.out.println("Reducer input is " + reducer_input);
- context.write(text,ONE);
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static String contentidxjoin(String contentId) {
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "10.10.100.187");
- @SuppressWarnings("resource")
- HTable table;
- try {
- table = new HTable(conf, ContentidxTable);
- if(table!= null) {
- Get get1 = new Get(Bytes.toBytes(contentId));
- get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier));
- Result result1 = table.get(get1);
- byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily),
- Bytes.toBytes(ContentidxTable_ColumnQualifier));
- //System.out.println("Value: " + Bytes.toString(val1));
- return_value = contentjoin(Bytes.toString(val1),contentId);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- return return_value;
- }
- public static String contentjoin(String value,String contentID) {
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "10.10.100.187");
- @SuppressWarnings("resource")
- HTable table;
- try {
- table = new HTable(conf, ContentTable);
- if(table!=null) {
- Get get1 = new Get(Bytes.toBytes(value));
- get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily));
- Result result1 = table.get(get1);
- //print out the name
- name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
- Bytes.toBytes("name"));
- //print out the duration
- duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
- Bytes.toBytes("duration"));
- //print out start time
- start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
- Bytes.toBytes("start"));
- start_string = Bytes.toString(start);
- date = start_string.split("T");
- act_date = date[0];
- time = date[1];
- time = time.replace("Z", "");
- //print out genre
- genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
- Bytes.toBytes("genre"));
- //print out type
- type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
- Bytes.toBytes("type"));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
- //return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
- }
- }
- /*
- * Reducer to implement GroupBY of the mapper output and dump the result to a file.
- */
- public static class AnalyzeReducerTable
- extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
- private JSONParser parser = new JSONParser();
- public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes();
- public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes();
- int count = 0;
- String[] splitStr;
- public static final byte[] CF = OutputTable_ColumnFamily.getBytes();
- int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24};
- public static final byte[] a = OutputTable_ColumnQualifier.getBytes();
- int key_row_count = 1;
- int row_count = 1;
- String current_HouseHoldId="";
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- Configuration HouseHold_conf = HBaseConfiguration.create();
- HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187");
- @SuppressWarnings("resource")
- HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable);
- String column_family = "";
- int min=0,max=0;
- JSONObject obj = new JSONObject();
- JSONObject obj1 = new JSONObject();
- JSONObject start_end_time = new JSONObject();
- try{
- for (IntWritable one : values) count++;
- String base = key.toString();
- if(base!=null) {
- splitStr = base.split("\\s+");
- System.out.println(key.toString());
- if(!(current_HouseHoldId.equals(splitStr[0]))) {
- set.clear();
- }
- SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa");
- sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC"));
- String date1 = sdf.format(new java.util.Date (unix_time));
- //System.out.println(unix_time/1000);
- String[] time_split = date1.split(" ");
- String[] time_bin = time_split[2].split(":");
- int hr = Integer.parseInt(time_bin[0]);
- for(int i=0;i<time_slot.length;i++){
- if(time_slot[i]<=hr && time_slot[i+1]>=hr){
- min = time_slot[i];
- max = time_slot[i+1];
- }
- }
- String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
- String row_key;
- row_key = splitStr[0] +"-"+ (hr_bin);
- //byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte;
- byte[] start_time=null;
- //get operation
- Get get = new Get(Bytes.toBytes(splitStr[0]));
- get.addColumn(HouseHold_CF, HouseHold_a);
- Result result1 = HBase_HouseHoldtable.get(get);
- if(result1!= null) {
- start_time = result1.getValue(HouseHold_CF,
- HouseHold_a);
- }
- if(Bytes.toString(start_time) != null) {
- //System.out.println(Bytes.toString(start_time));
- Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
- String value1 = Bytes.toString(start_time);
- JSONObject jsonObj = (JSONObject) parser.parse(value1);
- long start_actual_time = (Long) jsonObj.get("start_time");
- try {
- long end_actual_time = (Long) jsonObj.get("end_time");
- set.add(end_actual_time);
- System.out.println("End time" + end_actual_time);
- }
- catch(Exception e) {
- }
- System.out.println("Start time" + start_actual_time);
- set.add(start_actual_time);
- set.add(hr_bin);
- //myList.add(hr_bin);
- Object array_min = Collections.min(set);
- Object array_max = Collections.max(set);
- if(!(array_min.equals(array_max))) {
- start_end_time.put("start_time", array_min);
- start_end_time.put("end_time", array_max);
- } else {
- start_end_time.put("start_time", array_min);
- }
- HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
- HBase_HouseHoldtable.put(HouseHoldput);
- } else {
- Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
- //start_end_time.put("start_time", Bytes.toString(start_time));
- start_end_time.put("start_time", hr_bin);
- //myList.add((long)hr_bin);
- HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
- HBase_HouseHoldtable.put(HouseHoldput);
- set.clear();
- }
- obj1.put("contentId", splitStr[1]);
- obj1.put("name", splitStr[2]);
- obj1.put("duration", splitStr[3]);
- obj1.put("content_date", splitStr[4]);
- obj1.put("content_time", splitStr[5]);
- obj1.put("genre", splitStr[6]);
- obj1.put("type", splitStr[7]);
- obj1.put("channelId", splitStr[8]);
- obj1.put("session_time_bin", session_time_bin);
- obj1.put("personId", splitStr[9]);
- obj1.put("televisionId", splitStr[10]);
- obj1.put("timestamp", date1);
- obj.put(splitStr[0], obj1 );
- Put put = new Put(Bytes.toBytes(row_key));
- column_family = "" + (min_diff*60);
- put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table
- context.write(null, put);
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
- public static class AnalyzeReducerFile
- extends Reducer<Text, IntWritable, Text, IntWritable> {
- private Text output_text = new Text();
- String[] splitStr;
- int[] time_slot = {2,4,7,9,12,14,17,19,21,24};
- public static final byte[] CF = "cf".getBytes();
- @SuppressWarnings("unchecked")
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- int count = 0;
- String column_family = "";
- int min=0,max=0;
- JSONObject obj = new JSONObject();
- JSONObject obj1 = new JSONObject();
- try {
- for (IntWritable one : values) count++;
- String base = key.toString();
- if(base!=null) {
- System.out.println(base);
- splitStr = base.split("\\s+");
- String[] time_split = splitStr[5].split(":");
- int hr = Integer.parseInt(time_split[0]);
- for(int i=0;i<time_slot.length;i++){
- if(time_slot[i]<=hr && time_slot[i+1]>=hr){
- min = time_slot[i];
- max = time_slot[i+1];
- }
- }
- String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
- long unix_time = Long.parseLong(splitStr[11]);
- long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600));
- String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000));
- String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time));
- int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin);
- String row_key = splitStr[0] +"-"+ (hr_bin);
- System.out.println("Unix Time" + unix_time/1000);
- System.out.println("HR_BIN" + hr_bin);
- String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time));
- obj1.put("contentId", splitStr[1]);
- obj1.put("name", splitStr[2]);
- obj1.put("duration", splitStr[3]);
- obj1.put("actual_time", splitStr[5]);
- obj1.put("genre", splitStr[6]);
- obj1.put("type", splitStr[7]);
- obj1.put("channelId", splitStr[8]);
- obj1.put("session_time_bin", session_time_bin);
- obj1.put("personId", splitStr[9]);
- obj1.put("televisionId", splitStr[10]);
- obj1.put("timestamp", date1);
- obj.put(splitStr[0], obj1 );
- output_text.set(obj.toString());
- context.write(output_text, new IntWritable(count));
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException {
- Properties prop = new Properties();
- int count = 0;
- try {
- //load a properties file from class path, inside static method
- prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties"));
- //get the property value and print it out
- ViewersTable=prop.getProperty("ViewersTable");
- ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily");
- ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier");
- ContentidxTable=prop.getProperty("ContentidxTable");
- ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily");
- ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier");
- ContentTable=prop.getProperty("ContentTable");
- ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily");
- ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier");
- OutputTable=prop.getProperty("OutputTable");
- OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily");
- OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier");
- HouseHoldTable=prop.getProperty("HouseHoldTable");
- HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily");
- HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier");
- } catch (IOException ex) {
- throw new IOException("Config file issue");
- }
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "10.10.100.170");
- Job job = new Job(conf, ViewersTable);
- if(job!=null) {
- job.setJarByClass(SampleJob.class); // class that contains mapper
- Scan scan = new Scan();
- scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier));
- //scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l"));
- String option = "t";
- TableMapReduceUtil.initTableMapperJob(
- ViewersTable, // input HBase table name
- scan, // Scan instance to control CF and attribute selection
- AnalyzeMapper.class, // mapper
- Text.class, // mapper output key
- IntWritable.class, // mapper output value
- job);
- if(option.equals("t")) {
- TableMapReduceUtil.initTableReducerJob(
- OutputTable, // output table
- AnalyzeReducerTable.class, // reducer class
- job);
- job.setNumReduceTasks(1);
- }
- else {
- throw new IOException("Not a valid command line option.");
- }
- boolean b = job.waitForCompletion(true);
- set.clear();
- // boolean b = true;
- if (!b) {
- throw new IOException("error with job!");
- }
- else {
- System.out.println("Job is completed!");
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement