public class SampleJob {
public static String ViewersTable=null;
public static String ViewersTable_ColumnFamily=null;
public static String ViewersTable_ColumnQualifier=null;
public static String ContentidxTable=null;
public static String ContentidxTable_ColumnFamily=null;
public static String ContentidxTable_ColumnQualifier=null;
public static String ContentTable=null;
public static String ContentTable_ColumnFamily=null;
public static String ContentTable_ColumnQualifier=null;
public static String OutputTable=null;
public static String OutputTable_ColumnQualifier=null;
public static String OutputTable_ColumnFamily=null;
public static String HouseHoldTable=null;
public static String HouseHoldTable_ColumnQualifier=null;
public static String HouseHoldTable_ColumnFamily=null;
static HashSet<Long> set = new HashSet<Long>();
/**
* Implements the Mapper that reads the data and extracts the
* required information.
*/
static class AnalyzeMapper extends TableMapper<Text, IntWritable> {
private JSONParser parser = new JSONParser();
private IntWritable ONE = new IntWritable(1);
HashSet<String> hs = new HashSet<String>();
public static byte[] name, duration,start,genre,type;
static String start_string;
static String[] date;
static String act_date;
static String time;
public static String return_value;
String mapper_output,reducer_input;
private Text text = new Text();
/**
* Maps the input. Join of 3 HBase Tables.
*
* @param row The row key.
* @param columns The columns of the row.
* @param context The task context.
* @throws java.io.IOException When mapping the input fails.
*/
// vv AnalyzeData
@Override
public void map(ImmutableBytesWritable row, Result columns, Context context)
throws IOException {
try {
//getHouseHoldID();
for (KeyValue kv : columns.list()) {
String value = Bytes.toStringBinary(kv.getValue());
//hs.add(Bytes.toStringBinary(kv.getValue()));
/* parsing the json for contentID and HouseholdID*/
JSONObject jsonObj = (JSONObject) parser.parse(value);
JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID
Long contentId = (Long) payload.get("contentId"); //get the contentId
String channelId = (String) payload.get("channelId"); //channelId
String personId = (String) payload.get("personId"); //personId
String contentID = String.valueOf(contentId); //cast it to a string
String televisionID = (String) payload.get("televisionId"); //televisionId
JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID
String HouseHoldId = (String) header.get("identifier"); //get the householdID
long timestamp = (Long) header.get("timestamp");
/* end parsing of contentID and HouseholdID*/
//if parsing is not needed, send the variable 'value' directly to contentjoin method
mapper_output = contentidxjoin(contentID);
reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp;
if(reducer_input!=null) {
text.set(reducer_input);
//System.out.println("Reducer input is " + reducer_input);
context.write(text,ONE);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
}
public static String contentidxjoin(String contentId) {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "10.10.100.187");
@SuppressWarnings("resource")
HTable table;
try {
table = new HTable(conf, ContentidxTable);
if(table!= null) {
Get get1 = new Get(Bytes.toBytes(contentId));
get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier));
Result result1 = table.get(get1);
byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily),
Bytes.toBytes(ContentidxTable_ColumnQualifier));
//System.out.println("Value: " + Bytes.toString(val1));
return_value = contentjoin(Bytes.toString(val1),contentId);
}
}
catch (Exception e) {
e.printStackTrace();
}
return return_value;
}
public static String contentjoin(String value,String contentID) {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "10.10.100.187");
@SuppressWarnings("resource")
HTable table;
try {
table = new HTable(conf, ContentTable);
if(table!=null) {
Get get1 = new Get(Bytes.toBytes(value));
get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily));
Result result1 = table.get(get1);
//print out the name
name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
Bytes.toBytes("name"));
//print out the duration
duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
Bytes.toBytes("duration"));
//print out start time
start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
Bytes.toBytes("start"));
start_string = Bytes.toString(start);
date = start_string.split("T");
act_date = date[0];
time = date[1];
time = time.replace("Z", "");
//print out genre
genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
Bytes.toBytes("genre"));
//print out type
type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
Bytes.toBytes("type"));
}
}
catch (Exception e) {
e.printStackTrace();
}
return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
//return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
}
}
/*
* Reducer to implement GroupBY of the mapper output and dump the result to a file.
*/
public static class AnalyzeReducerTable
extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
private JSONParser parser = new JSONParser();
public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes();
public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes();
int count = 0;
String[] splitStr;
public static final byte[] CF = OutputTable_ColumnFamily.getBytes();
int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24};
public static final byte[] a = OutputTable_ColumnQualifier.getBytes();
int key_row_count = 1;
int row_count = 1;
String current_HouseHoldId="";
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
Configuration HouseHold_conf = HBaseConfiguration.create();
HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187");
@SuppressWarnings("resource")
HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable);
String column_family = "";
int min=0,max=0;
JSONObject obj = new JSONObject();
JSONObject obj1 = new JSONObject();
JSONObject start_end_time = new JSONObject();
try{
for (IntWritable one : values) count++;
String base = key.toString();
if(base!=null) {
splitStr = base.split("\\s+");
System.out.println(key.toString());
if(!(current_HouseHoldId.equals(splitStr[0]))) {
set.clear();
}
SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa");
sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC"));
String date1 = sdf.format(new java.util.Date (unix_time));
//System.out.println(unix_time/1000);
String[] time_split = date1.split(" ");
String[] time_bin = time_split[2].split(":");
int hr = Integer.parseInt(time_bin[0]);
for(int i=0;i<time_slot.length;i++){
if(time_slot[i]<=hr && time_slot[i+1]>=hr){
min = time_slot[i];
max = time_slot[i+1];
}
}
String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
String row_key;
row_key = splitStr[0] +"-"+ (hr_bin);
//byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte;
byte[] start_time=null;
//get operation
Get get = new Get(Bytes.toBytes(splitStr[0]));
get.addColumn(HouseHold_CF, HouseHold_a);
Result result1 = HBase_HouseHoldtable.get(get);
if(result1!= null) {
start_time = result1.getValue(HouseHold_CF,
HouseHold_a);
}
if(Bytes.toString(start_time) != null) {
//System.out.println(Bytes.toString(start_time));
Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
String value1 = Bytes.toString(start_time);
JSONObject jsonObj = (JSONObject) parser.parse(value1);
long start_actual_time = (Long) jsonObj.get("start_time");
try {
long end_actual_time = (Long) jsonObj.get("end_time");
set.add(end_actual_time);
System.out.println("End time" + end_actual_time);
}
catch(Exception e) {
}
System.out.println("Start time" + start_actual_time);
set.add(start_actual_time);
set.add(hr_bin);
//myList.add(hr_bin);
Object array_min = Collections.min(set);
Object array_max = Collections.max(set);
if(!(array_min.equals(array_max))) {
start_end_time.put("start_time", array_min);
start_end_time.put("end_time", array_max);
} else {
start_end_time.put("start_time", array_min);
}
HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
HBase_HouseHoldtable.put(HouseHoldput);
} else {
Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
//start_end_time.put("start_time", Bytes.toString(start_time));
start_end_time.put("start_time", hr_bin);
//myList.add((long)hr_bin);
HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
HBase_HouseHoldtable.put(HouseHoldput);
set.clear();
}
obj1.put("contentId", splitStr[1]);
obj1.put("name", splitStr[2]);
obj1.put("duration", splitStr[3]);
obj1.put("content_date", splitStr[4]);
obj1.put("content_time", splitStr[5]);
obj1.put("genre", splitStr[6]);
obj1.put("type", splitStr[7]);
obj1.put("channelId", splitStr[8]);
obj1.put("session_time_bin", session_time_bin);
obj1.put("personId", splitStr[9]);
obj1.put("televisionId", splitStr[10]);
obj1.put("timestamp", date1);
obj.put(splitStr[0], obj1 );
Put put = new Put(Bytes.toBytes(row_key));
column_family = "" + (min_diff*60);
put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table
context.write(null, put);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
public static class AnalyzeReducerFile
extends Reducer<Text, IntWritable, Text, IntWritable> {
private Text output_text = new Text();
String[] splitStr;
int[] time_slot = {2,4,7,9,12,14,17,19,21,24};
public static final byte[] CF = "cf".getBytes();
@SuppressWarnings("unchecked")
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
String column_family = "";
int min=0,max=0;
JSONObject obj = new JSONObject();
JSONObject obj1 = new JSONObject();
try {
for (IntWritable one : values) count++;
String base = key.toString();
if(base!=null) {
System.out.println(base);
splitStr = base.split("\\s+");
String[] time_split = splitStr[5].split(":");
int hr = Integer.parseInt(time_split[0]);
for(int i=0;i<time_slot.length;i++){
if(time_slot[i]<=hr && time_slot[i+1]>=hr){
min = time_slot[i];
max = time_slot[i+1];
}
}
String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
long unix_time = Long.parseLong(splitStr[11]);
long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600));
String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000));
String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time));
int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin);
String row_key = splitStr[0] +"-"+ (hr_bin);
System.out.println("Unix Time" + unix_time/1000);
System.out.println("HR_BIN" + hr_bin);
String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time));
obj1.put("contentId", splitStr[1]);
obj1.put("name", splitStr[2]);
obj1.put("duration", splitStr[3]);
obj1.put("actual_time", splitStr[5]);
obj1.put("genre", splitStr[6]);
obj1.put("type", splitStr[7]);
obj1.put("channelId", splitStr[8]);
obj1.put("session_time_bin", session_time_bin);
obj1.put("personId", splitStr[9]);
obj1.put("televisionId", splitStr[10]);
obj1.put("timestamp", date1);
obj.put(splitStr[0], obj1 );
output_text.set(obj.toString());
context.write(output_text, new IntWritable(count));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException {
Properties prop = new Properties();
int count = 0;
try {
//load a properties file from class path, inside static method
prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties"));
//get the property value and print it out
ViewersTable=prop.getProperty("ViewersTable");
ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily");
ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier");
ContentidxTable=prop.getProperty("ContentidxTable");
ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily");
ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier");
ContentTable=prop.getProperty("ContentTable");
ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily");
ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier");
OutputTable=prop.getProperty("OutputTable");
OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily");
OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier");
HouseHoldTable=prop.getProperty("HouseHoldTable");
HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily");
HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier");
} catch (IOException ex) {
throw new IOException("Config file issue");
}
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "10.10.100.170");
Job job = new Job(conf, ViewersTable);
if(job!=null) {
job.setJarByClass(SampleJob.class); // class that contains mapper
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier));
//scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l"));
String option = "t";
TableMapReduceUtil.initTableMapperJob(
ViewersTable, // input HBase table name
scan, // Scan instance to control CF and attribute selection
AnalyzeMapper.class, // mapper
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
if(option.equals("t")) {
TableMapReduceUtil.initTableReducerJob(
OutputTable, // output table
AnalyzeReducerTable.class, // reducer class
job);
job.setNumReduceTasks(1);
}
else {
throw new IOException("Not a valid command line option.");
}
boolean b = job.waitForCompletion(true);
set.clear();
// boolean b = true;
if (!b) {
throw new IOException("error with job!");
}
else {
System.out.println("Job is completed!");
}
}
}
}