1. @Override
  2. public String exec(Tuple input) throws IOException {
  3. if (input == null) {
  4. warn("Input to LogIndex is null", PigWarning.UDF_WARNING_1);
  5. return null;
  6. }
  7.  
  8. HashMap<String, Integer> logFields;
  9. String field = "";
  10. String logType = "";
  11. int logTypeIdx = 0;
  12.  
  13. try {
  14. // Shift logType index to 1 if source tagging is used
  15. // This is because filename is prepended at 0th index
  16. if(isSourceTaggingUsed)
  17. logTypeIdx = 1;
  18. logType = input.get(logTypeIdx).toString();
  19. if (logSchema == null) {
  20. // Read the file, open as a local file
  21. logSchema = MNMLogParser.parseLogMetadata(this.logTransformsManager.getTransformsConfLoadPath());
  22. }
  23. logFields = logSchema.get(logType);
  24. field = input.get(input.size() - 1).toString();
  25. // Index of field in the schema
  26. Object idx = logFields.get(field.toLowerCase());
  27.  
  28. // Look in the Key-Value part of log line if the field
  29. // is not a part of transforms.conf
  30. if(null == idx) {
  31. // Assumption: KeyValue part always appears as the last part of log line
  32. String keyValLine = input.get(input.size() - 2).toString();
  33. return MNMLogParser.parseKeyValue(keyValLine, field);
  34. }
  35.  
  36. int intIdx = ((Integer)idx).intValue();
  37. // Increment the element's position if source tagging is used
  38. if(isSourceTaggingUsed) {
  39. intIdx++;
  40. }
  41.  
  42. Object val = input.get(intIdx);
  43.  
  44. return val.toString();
  45.  
  46. } catch (IndexOutOfBoundsException ie) {
  47. String msg = "Field is : " + field + " logType is :" + logType;
  48. warn(msg + " --> " + ie.toString(), PigWarning.UDF_WARNING_2);
  49. return null;
  50. } catch (NullPointerException npe) {
  51. warn(npe.toString(), PigWarning.UDF_WARNING_3);
  52. return null;
  53. }
  54. }