Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Override
- public String exec(Tuple input) throws IOException {
- if (input == null) {
- warn("Input to LogIndex is null", PigWarning.UDF_WARNING_1);
- return null;
- }
- HashMap<String, Integer> logFields;
- String field = "";
- String logType = "";
- int logTypeIdx = 0;
- try {
- // Shift logType index to 1 if source tagging is used
- // This is because filename is prepended at 0th index
- if(isSourceTaggingUsed)
- logTypeIdx = 1;
- logType = input.get(logTypeIdx).toString();
- if (logSchema == null) {
- // Read the file, open as a local file
- logSchema = MNMLogParser.parseLogMetadata(this.logTransformsManager.getTransformsConfLoadPath());
- }
- logFields = logSchema.get(logType);
- field = input.get(input.size() - 1).toString();
- // Index of field in the schema
- Object idx = logFields.get(field.toLowerCase());
- // Look in the Key-Value part of log line if the field
- // is not a part of transforms.conf
- if(null == idx) {
- // Assumption: KeyValue part always appears as the last part of log line
- String keyValLine = input.get(input.size() - 2).toString();
- return MNMLogParser.parseKeyValue(keyValLine, field);
- }
- int intIdx = ((Integer)idx).intValue();
- // Increment the element's position if source tagging is used
- if(isSourceTaggingUsed) {
- intIdx++;
- }
- Object val = input.get(intIdx);
- return val.toString();
- } catch (IndexOutOfBoundsException ie) {
- String msg = "Field is : " + field + " logType is :" + logType;
- warn(msg + " --> " + ie.toString(), PigWarning.UDF_WARNING_2);
- return null;
- } catch (NullPointerException npe) {
- warn(npe.toString(), PigWarning.UDF_WARNING_3);
- return null;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement