View difference between Paste ID: mxY4AqBA and
SHOW: | | - or go back to the newest paste.
1-
1+
public class SampleJob {
2
	
3
		public static String ViewersTable=null;
4
		public static String ViewersTable_ColumnFamily=null;
5
		public static String ViewersTable_ColumnQualifier=null;
6
		public static String ContentidxTable=null;
7
		public static String ContentidxTable_ColumnFamily=null;
8
		public static String ContentidxTable_ColumnQualifier=null;
9
		public static String ContentTable=null;
10
		public static String ContentTable_ColumnFamily=null;
11
		public static String ContentTable_ColumnQualifier=null;
12
		public static String OutputTable=null;
13
		public static String OutputTable_ColumnQualifier=null;
14
		public static String OutputTable_ColumnFamily=null;
15
		public static String HouseHoldTable=null;
16
		public static String HouseHoldTable_ColumnQualifier=null;
17
		public static String HouseHoldTable_ColumnFamily=null;
18
		static HashSet<Long> set = new HashSet<Long>();
19
	/**
20
   * Implements the Mapper that reads the data and extracts the
21
   * required information.
22
   */
23
  static class AnalyzeMapper extends TableMapper<Text, IntWritable>  { 
24
25
	  private JSONParser parser = new JSONParser();
26
	  private IntWritable ONE = new IntWritable(1);
27
	  HashSet<String> hs = new HashSet<String>();
28
	  public static byte[] name, duration,start,genre,type;
29
	  static String start_string;
30
	  	static String[] date;
31
	  	static String act_date;
32
	  	static String time;
33
	  public static String return_value;
34
	  String mapper_output,reducer_input;
35
	  private Text text = new Text();
36
	  
37
    /**
38
     * Maps the input. Join of 3 HBase Tables.
39
     *
40
     * @param row The row key.
41
     * @param columns The columns of the row.
42
     * @param context The task context.
43
     * @throws java.io.IOException When mapping the input fails.
44
     */
45
    // vv AnalyzeData
46
    @Override
47
    public void map(ImmutableBytesWritable row, Result columns, Context context)
48
    throws IOException {
49
    	try {
50
    		//getHouseHoldID();
51
            for (KeyValue kv : columns.list()) {
52
            	String value = Bytes.toStringBinary(kv.getValue());
53
            	//hs.add(Bytes.toStringBinary(kv.getValue()));
54
            	
55
            	/* parsing the json for contentID and HouseholdID*/
56
            	
57
            	JSONObject jsonObj = (JSONObject) parser.parse(value);
58
            	JSONObject payload = (JSONObject) jsonObj.get("payload"); //for parsing contentID
59
            	Long contentId = (Long) payload.get("contentId"); //get the contentId
60
            	String channelId = (String) payload.get("channelId"); //channelId
61
            	String personId = (String) payload.get("personId"); //personId
62
            	String contentID = String.valueOf(contentId); //cast it to a string
63
            	String televisionID = (String) payload.get("televisionId"); //televisionId
64
            	JSONObject header = (JSONObject) jsonObj.get("header"); //for parsing HouseholdID
65
            	String HouseHoldId = (String) header.get("identifier"); //get the householdID
66
            	long timestamp = (Long) header.get("timestamp");
67
            	
68
            	/* end parsing of contentID and HouseholdID*/
69
            	
70
            	//if parsing is not needed, send the variable 'value' directly to contentjoin method
71
            	
72
            	mapper_output = contentidxjoin(contentID);
73
            	reducer_input = HouseHoldId +" "+mapper_output +" "+channelId+" "+personId+" "+televisionID +" "+timestamp;
74
            	if(reducer_input!=null) {
75
            	text.set(reducer_input);
76
            	//System.out.println("Reducer input is " + reducer_input);
77
            	context.write(text,ONE);
78
            	}
79
            }
80
    	}
81
    	catch (Exception e) {
82
            e.printStackTrace(); 
83
    	}
84
  }
85
	public static String contentidxjoin(String contentId) {
86
		  Configuration conf = HBaseConfiguration.create();
87
		  conf.set("hbase.zookeeper.quorum", "10.10.100.187");
88
		  @SuppressWarnings("resource")
89
		HTable table;
90
		try {
91
			table = new HTable(conf, ContentidxTable);
92
			if(table!= null) {
93
			Get get1 = new Get(Bytes.toBytes(contentId));
94
			get1.addColumn(Bytes.toBytes(ContentidxTable_ColumnFamily), Bytes.toBytes(ContentidxTable_ColumnQualifier));
95
			Result result1 = table.get(get1); 
96
			byte[] val1 = result1.getValue(Bytes.toBytes(ContentidxTable_ColumnFamily),
97
			      Bytes.toBytes(ContentidxTable_ColumnQualifier));
98
		  //System.out.println("Value: " + Bytes.toString(val1));
99
			return_value = contentjoin(Bytes.toString(val1),contentId);
100
			} 
101
		}
102
		
103
		catch (Exception e) {
104
			e.printStackTrace();
105
		}
106
		return return_value;
107
		
108
	  }
109
    public static String contentjoin(String value,String contentID) {
110
    	
111
		  Configuration conf = HBaseConfiguration.create();
112
		conf.set("hbase.zookeeper.quorum", "10.10.100.187");
113
		  @SuppressWarnings("resource")
114
		HTable table;
115
		try {
116
			  table = new HTable(conf, ContentTable);
117
			  if(table!=null) {
118
			  Get get1 = new Get(Bytes.toBytes(value));
119
			  get1.addFamily(Bytes.toBytes(ContentTable_ColumnFamily));
120
			  Result result1 = table.get(get1);
121
			  
122
			  //print out the name
123
			  name = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
124
				      Bytes.toBytes("name"));
125
			  //print out the duration
126
			  duration = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
127
				      Bytes.toBytes("duration"));
128
			  //print out start time
129
			  start = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
130
				      Bytes.toBytes("start"));
131
			  start_string = Bytes.toString(start);
132
			  date = start_string.split("T");
133
			  act_date = date[0];
134
			  time = date[1];
135
			  time = time.replace("Z", "");
136
			  //print out genre
137
			  genre = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
138
				      Bytes.toBytes("genre"));
139
			//print out type
140
			  type = result1.getValue(Bytes.toBytes(ContentTable_ColumnFamily),
141
				      Bytes.toBytes("type"));
142
			  }
143
		}
144
		catch (Exception e) {
145
			e.printStackTrace();
146
		}
147
		return contentID + " "+ Bytes.toString(name).replaceAll("\\s","") +" "+ Bytes.toString(duration) +" "+ act_date +" "+ time +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
148
		//return Bytes.toString(duration) + " "+ Bytes.toString(name) +" "+ contentID +" "+ Bytes.toString(start) +" "+ Bytes.toString(genre)+" "+ Bytes.toString(type);
149
    }
150
  } 
151
  /*
152
   * Reducer to implement GroupBY of the mapper output and dump the result to a file.
153
  */  
154
public static class AnalyzeReducerTable  
155
  extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { 
156
	  private JSONParser parser = new JSONParser();
157
	  public static final byte[] HouseHold_CF = HouseHoldTable_ColumnFamily.getBytes();
158
	  public static final byte[] HouseHold_a = HouseHoldTable_ColumnQualifier.getBytes();
159
	  int count = 0;
160
	  String[] splitStr;
161
	  public static final byte[] CF = OutputTable_ColumnFamily.getBytes();
162
	  int[] time_slot = {0,2,4,7,9,12,14,17,19,21,24};
163
	  public static final byte[] a = OutputTable_ColumnQualifier.getBytes();
164
	  int key_row_count = 1;
165
	  int row_count = 1;
166
	  String current_HouseHoldId="";
167
    
168
	  @Override
169
    protected void reduce(Text key, Iterable<IntWritable> values,
170
      Context context) throws IOException, InterruptedException {
171
      Configuration HouseHold_conf = HBaseConfiguration.create();
172
	HouseHolds_conf.set("hbase.zookeeper.quorum", "10.10.100.187");
173
  	  @SuppressWarnings("resource")
174
  	  HTable HBase_HouseHoldtable = new HTable(HouseHold_conf, HouseHoldTable);
175
	
176
      String column_family = "";
177
      int min=0,max=0;
178
      JSONObject obj = new JSONObject();
179
      JSONObject obj1 = new JSONObject();
180
      JSONObject start_end_time = new JSONObject();
181
      try{
182
	      for (IntWritable one : values) count++; 
183
	      String base = key.toString();
184
	      if(base!=null) {
185
	      splitStr = base.split("\\s+");
186
	      System.out.println(key.toString());
187
	      if(!(current_HouseHoldId.equals(splitStr[0]))) {
188
	    	  set.clear();
189
	      }
190
	      
191
	      SimpleDateFormat sdf = new SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa");
192
	      sdf.setTimeZone(new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC"));
193
	      String date1 = sdf.format(new java.util.Date (unix_time));
194
	      //System.out.println(unix_time/1000);
195
	      String[] time_split = date1.split(" ");
196
	      String[] time_bin  = time_split[2].split(":"); 
197
	      int hr = Integer.parseInt(time_bin[0]);
198
	      for(int i=0;i<time_slot.length;i++){
199
	    	  if(time_slot[i]<=hr && time_slot[i+1]>=hr){
200
	    		  min = time_slot[i];
201
	    		  max = time_slot[i+1];
202
	    	  }
203
	      }
204
	      String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
205
	      String row_key;
206
	      row_key = splitStr[0] +"-"+ (hr_bin);
207
	      
208
	      //byte[] row_key_byte = HouseHoldId_byte + Bytes.toBytes("-") + hr_bin_byte;
209
	      byte[] start_time=null;
210
	      //get operation
211
	      Get get = new Get(Bytes.toBytes(splitStr[0]));
212
	      get.addColumn(HouseHold_CF, HouseHold_a);
213
	      Result result1 = HBase_HouseHoldtable.get(get); 
214
	      if(result1!= null) {
215
	      start_time = result1.getValue(HouseHold_CF,
216
			      HouseHold_a);
217
	      }
218
	      if(Bytes.toString(start_time) != null) {
219
	    	  //System.out.println(Bytes.toString(start_time));
220
	      Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
221
	      String value1 = Bytes.toString(start_time);
222
	      JSONObject jsonObj = (JSONObject) parser.parse(value1);
223
      		long start_actual_time = (Long) jsonObj.get("start_time");
224
      		try {
225
      			long end_actual_time = (Long) jsonObj.get("end_time");
226
      			set.add(end_actual_time);     
227
      			System.out.println("End time" + end_actual_time);
228
      			}
229
      		catch(Exception e) {
230
      			
231
      		}
232
      		System.out.println("Start time" + start_actual_time);
233
      		set.add(start_actual_time);
234
      		set.add(hr_bin);
235
      		//myList.add(hr_bin);
236
      		Object array_min = Collections.min(set);
237
      		Object array_max = Collections.max(set);
238
      		if(!(array_min.equals(array_max))) {
239
      			start_end_time.put("start_time", array_min);
240
      			start_end_time.put("end_time", array_max);
241
      		} else {
242
      			start_end_time.put("start_time", array_min);
243
      		}
244
    		  HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
245
		      HBase_HouseHoldtable.put(HouseHoldput);
246
	      } else {
247
	    	  Put HouseHoldput = new Put(Bytes.toBytes(splitStr[0]));
248
	    		  //start_end_time.put("start_time", Bytes.toString(start_time));
249
	    		  start_end_time.put("start_time", hr_bin);
250
	    		  //myList.add((long)hr_bin);
251
	    		  HouseHoldput.add(HouseHold_CF,HouseHold_a,Bytes.toBytes(start_end_time.toString()));
252
			      HBase_HouseHoldtable.put(HouseHoldput);
253
			      set.clear();
254
	      }
255
	      
256
	      obj1.put("contentId", splitStr[1]);
257
	      obj1.put("name", splitStr[2]);
258
	      obj1.put("duration", splitStr[3]);
259
	      obj1.put("content_date", splitStr[4]);
260
	      obj1.put("content_time", splitStr[5]);
261
	      obj1.put("genre", splitStr[6]);
262
	      obj1.put("type", splitStr[7]);
263
	      obj1.put("channelId", splitStr[8]);
264
	      obj1.put("session_time_bin", session_time_bin);
265
	      obj1.put("personId", splitStr[9]);
266
	      obj1.put("televisionId", splitStr[10]);
267
	      obj1.put("timestamp", date1);
268
	      obj.put(splitStr[0], obj1 );
269
	      Put put = new Put(Bytes.toBytes(row_key));
270
	      column_family = "" + (min_diff*60);
271
	      put.add(CF, Bytes.toBytes(column_family), Bytes.toBytes(obj1.toString())); //put it to a HBase table
272
	      context.write(null, put);
273
	      }
274
      }
275
      catch(Exception e) {
276
    	  e.printStackTrace();
277
      }
278
    }
279
  }
280
  
281
  public static class AnalyzeReducerFile
282
  extends Reducer<Text, IntWritable, Text, IntWritable> { 
283
	  private Text output_text = new Text();
284
	  String[] splitStr;
285
	  int[] time_slot = {2,4,7,9,12,14,17,19,21,24};
286
	  public static final byte[] CF = "cf".getBytes();
287
    @SuppressWarnings("unchecked")
288
	@Override
289
    protected void reduce(Text key, Iterable<IntWritable> values,
290
      Context context) throws IOException, InterruptedException {
291
      int count = 0;
292
      String column_family = "";
293
      int min=0,max=0;
294
      JSONObject obj = new JSONObject();
295
      JSONObject obj1 = new JSONObject();
296
      try {
297
	      for (IntWritable one : values) count++; 
298
	      String base = key.toString();
299
	      if(base!=null) {
300
	      System.out.println(base);
301
	      splitStr = base.split("\\s+");
302
	      String[] time_split = splitStr[5].split(":");
303
	      int hr = Integer.parseInt(time_split[0]); 
304
	      for(int i=0;i<time_slot.length;i++){
305
	    	  if(time_slot[i]<=hr && time_slot[i+1]>=hr){
306
	    		  min = time_slot[i];
307
	    		  max = time_slot[i+1];
308
	    	  }
309
	      }
310
	      String session_time_bin = "TIME_" + String.valueOf(min) + "_TO_" +String.valueOf(max);
311
	      long unix_time = Long.parseLong(splitStr[11]);
312
	      long hr_bin = ((unix_time/1000) - ((unix_time/1000) % 3600));
313
	      String min_bin = new java.text.SimpleDateFormat("mm").format(new java.util.Date (hr_bin*1000));
314
	      String min_actual = new java.text.SimpleDateFormat("mm").format(new java.util.Date (unix_time));
315
	      int min_diff = Integer.parseInt(min_actual) - Integer.parseInt(min_bin);
316
	      String row_key = splitStr[0] +"-"+ (hr_bin);
317
	      System.out.println("Unix Time" + unix_time/1000);
318
	      System.out.println("HR_BIN" + hr_bin);
319
	      String date1 = new java.text.SimpleDateFormat("MM-dd-yyyy EEE HH:mm:ss aaa").format(new java.util.Date (unix_time));
320
	      obj1.put("contentId", splitStr[1]);
321
	      obj1.put("name", splitStr[2]);
322
	      obj1.put("duration", splitStr[3]);
323
	      obj1.put("actual_time", splitStr[5]);
324
	      obj1.put("genre", splitStr[6]);
325
	      obj1.put("type", splitStr[7]);
326
	      obj1.put("channelId", splitStr[8]);
327
	      obj1.put("session_time_bin", session_time_bin);
328
	      obj1.put("personId", splitStr[9]);
329
	      obj1.put("televisionId", splitStr[10]);
330
	      obj1.put("timestamp", date1);
331
	      obj.put(splitStr[0], obj1 );
332
	      output_text.set(obj.toString());
333
	      context.write(output_text, new IntWritable(count));
334
	      }
335
      }  
336
      catch(Exception e) {
337
    	  e.printStackTrace();
338
      }
339
    }
340
  }
341
  public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException {
342
	  
343
	  Properties prop = new Properties();
344
	  int count = 0;
345
	  
346
	  
347
  	try {
348
	        //load a properties file from class path, inside static method
349
	  		prop.load(SampleJob.class.getClassLoader().getResourceAsStream("config.properties"));     
350
	  		//get the property value and print it out
351
	  		ViewersTable=prop.getProperty("ViewersTable");
352
	  		ViewersTable_ColumnFamily=prop.getProperty("ViewersTable_ColumnFamily");
353
	  		ViewersTable_ColumnQualifier=prop.getProperty("ViewersTable_ColumnQualifier");
354
	  		ContentidxTable=prop.getProperty("ContentidxTable");
355
	  		ContentidxTable_ColumnFamily=prop.getProperty("ContentidxTable_ColumnFamily");
356
	  		ContentidxTable_ColumnQualifier=prop.getProperty("ContentidxTable_ColumnQualifier");
357
	  		ContentTable=prop.getProperty("ContentTable");
358
	  		ContentTable_ColumnFamily=prop.getProperty("ContentTable_ColumnFamily");
359
	  		ContentTable_ColumnQualifier=prop.getProperty("ContentTable_ColumnQualifier");
360
	  		OutputTable=prop.getProperty("OutputTable");
361
	  		OutputTable_ColumnFamily=prop.getProperty("OutputTable_ColumnFamily");
362
	  		OutputTable_ColumnQualifier=prop.getProperty("OutputTable_ColumnQualifier");
363
	  		HouseHoldTable=prop.getProperty("HouseHoldTable");
364
	  		HouseHoldTable_ColumnFamily=prop.getProperty("HouseHoldTable_ColumnFamily");
365
	  		HouseHoldTable_ColumnQualifier=prop.getProperty("HouseHoldTable_ColumnQualifier");
366
367
  	} catch (IOException ex) {
368
  		throw new IOException("Config file issue");
369
      }
370
	Configuration conf = HBaseConfiguration.create();
371
	conf.set("hbase.zookeeper.quorum", "10.10.100.170");
372
    Job job = new Job(conf, ViewersTable);
373
    if(job!=null) {
374
    job.setJarByClass(SampleJob.class);     // class that contains mapper
375
376
    Scan scan = new Scan();
377
    scan.addColumn(Bytes.toBytes(ViewersTable_ColumnFamily), Bytes.toBytes(ViewersTable_ColumnQualifier)); 
378
  //scan.addColumn(Bytes.toBytes("cfa"), Bytes.toBytes("l"));
379
    String option = "t";
380
    TableMapReduceUtil.initTableMapperJob(
381
      ViewersTable,        // input HBase table name
382
      scan,             // Scan instance to control CF and attribute selection
383
      AnalyzeMapper.class,   // mapper
384
      Text.class,             // mapper output key
385
      IntWritable.class,             // mapper output value
386
      job);
387
    if(option.equals("t")) {
388
    TableMapReduceUtil.initTableReducerJob(
389
    		OutputTable,        // output table
390
    		AnalyzeReducerTable.class,    // reducer class
391
    		job);
392
    job.setNumReduceTasks(1);  
393
    }
394
    else {
395
    	throw new IOException("Not a valid command line option.");
396
    }
397
    boolean b = job.waitForCompletion(true);
398
    set.clear();
399
   // boolean b = true;
400
    if (!b) {
401
      throw new IOException("error with job!");
402
    }
403
    else {
404
    System.out.println("Job is completed!"); 
405
    	}
406
    }
407
  }
408
}