@Override public String exec(Tuple input) throws IOException { if (input == null) { warn("Input to LogIndex is null", PigWarning.UDF_WARNING_1); return null; } HashMap logFields; String field = ""; String logType = ""; int logTypeIdx = 0; try { // Shift logType index to 1 if source tagging is used // This is because filename is prepended at 0th index if(isSourceTaggingUsed) logTypeIdx = 1; logType = input.get(logTypeIdx).toString(); if (logSchema == null) { // Read the file, open as a local file logSchema = MNMLogParser.parseLogMetadata(this.logTransformsManager.getTransformsConfLoadPath()); } logFields = logSchema.get(logType); field = input.get(input.size() - 1).toString(); // Index of field in the schema Object idx = logFields.get(field.toLowerCase()); // Look in the Key-Value part of log line if the field // is not a part of transforms.conf if(null == idx) { // Assumption: KeyValue part always appears as the last part of log line String keyValLine = input.get(input.size() - 2).toString(); return MNMLogParser.parseKeyValue(keyValLine, field); } int intIdx = ((Integer)idx).intValue(); // Increment the element's position if source tagging is used if(isSourceTaggingUsed) { intIdx++; } Object val = input.get(intIdx); return val.toString(); } catch (IndexOutOfBoundsException ie) { String msg = "Field is : " + field + " logType is :" + logType; warn(msg + " --> " + ie.toString(), PigWarning.UDF_WARNING_2); return null; } catch (NullPointerException npe) { warn(npe.toString(), PigWarning.UDF_WARNING_3); return null; } }