@Override
public String exec(Tuple input) throws IOException {
if (input == null) {
warn("Input to LogIndex is null", PigWarning.UDF_WARNING_1);
return null;
}
HashMap<String, Integer> logFields;
String field = "";
String logType = "";
int logTypeIdx = 0;
try {
// Shift logType index to 1 if source tagging is used
// This is because filename is prepended at 0th index
if(isSourceTaggingUsed)
logTypeIdx = 1;
logType = input.get(logTypeIdx).toString();
if (logSchema == null) {
// Read the file, open as a local file
logSchema = MNMLogParser.parseLogMetadata(this.logTransformsManager.getTransformsConfLoadPath());
}
logFields = logSchema.get(logType);
field = input.get(input.size() - 1).toString();
// Index of field in the schema
Object idx = logFields.get(field.toLowerCase());
// Look in the Key-Value part of log line if the field
// is not a part of transforms.conf
if(null == idx) {
// Assumption: KeyValue part always appears as the last part of log line
String keyValLine = input.get(input.size() - 2).toString();
return MNMLogParser.parseKeyValue(keyValLine, field);
}
int intIdx = ((Integer)idx).intValue();
// Increment the element's position if source tagging is used
if(isSourceTaggingUsed) {
intIdx++;
}
Object val = input.get(intIdx);
return val.toString();
} catch (IndexOutOfBoundsException ie) {
String msg = "Field is : " + field + " logType is :" + logType;
warn(msg + " --> " + ie.toString(), PigWarning.UDF_WARNING_2);
return null;
} catch (NullPointerException npe) {
warn(npe.toString(), PigWarning.UDF_WARNING_3);
return null;
}
}