Guest User

Untitled

a guest
Jul 19th, 2018
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.06 KB | None | 0 0
  1. Regex: ^([^ ]*) +([^ ]*) +([^ ]*) +\[([^\]]*)\] +([^ ]*) +([^ ]*) +([^ ]*) +\"([^\"]*)\" +([^ ]*) +\"([^\"]*)\" +\"([^\"]*)\" +\"([^\"]*)\" +\(([^\)]*)\) +\"([^\"]*)\".*$
  2. SOURCES:
  3. KEY: nginxAccessLog --> Lfs["TextLine[['offset', 'line']->[ALL]]"]["../data/scribd-access-log-small"]"]
  4. SINKS:
  5. KEY: pipeUrlPathsByBrowserId --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/url_paths_by_browser_id/"]"]
  6. KEY: pipeReqsById --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/reqs_by_ip/"]"]
  7. KEY: pipeUrlPaths --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/url_paths/"]"]
  8. KEY: pipeParse --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/logs/"]"]
  9. TRAPS:
  10. KEY: pipeParse --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/traps/"]"]
  11. 09/12/07 15:16:27 INFO flow.MultiMapReducePlanner: using application jar: /Users/ijcd/work/hadoop/nginxlog/./build/nginxlogparser.jar
  12. Exception in thread "main" cascading.flow.PlannerException: not all head pipes bound to source taps, remaining head pipe names: [pipeParse], remaining sources: [nginxAccessLog]
  13. at cascading.flow.FlowPlanner.verifyPipeAssemblyEndPoints(FlowPlanner.java:204)
  14. at cascading.flow.FlowPlanner.verifyAssembly(FlowPlanner.java:71)
  15. at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:169)
  16. at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
  17. at nginxlogparser.Main.main(Unknown Source)
  18. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  19. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  20. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  21. at java.lang.reflect.Method.invoke(Method.java:597)
  22. at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
  23.  
  24.  
  25.  
  26. Code follows:
  27.  
  28. // TODO: parse args and have jobs be selectable by an arg
  29. public static void main(String[] args) throws UnsupportedEncodingException {
  30.  
  31. String inputPath = args[0];
  32. String trapsPath = args[1] + "/traps/";
  33. String logsPath = args[1] + "/logs/";
  34. String reqsPath = args[1] + "/reqs_by_ip/";
  35. String urlPathsByBrowserId = args[1] + "/url_paths_by_browser_id/";
  36. String urlPathsPath = args[1] + "/url_paths/";
  37.  
  38. // set the current job jar
  39. Properties properties = new Properties();
  40. FlowConnector.setApplicationJarClass(properties, Main.class);
  41.  
  42. // connectors
  43. FlowConnector flowConnector = new FlowConnector(properties);
  44. CascadeConnector cascadeConnector = new CascadeConnector();
  45.  
  46. // source
  47. Tap sourceNginxLogs = new Lfs(new TextLine(), inputPath);
  48.  
  49. // sinks
  50. Tap sinkLogs = new Hfs(new TextLine(), logsPath, SinkMode.REPLACE);
  51. Tap sinkReqsById = new Hfs(new TextLine(), reqsPath, SinkMode.REPLACE);
  52. Tap sinkUrlPathsByBrowserId = new Hfs(new TextLine(), urlPathsByBrowserId, SinkMode.REPLACE);
  53. Tap sinkUrlPaths = new Hfs(new TextLine(), urlPathsPath, SinkMode.REPLACE);
  54. Tap trapsTap = new Hfs(new TextLine(), trapsPath, SinkMode.REPLACE);
  55.  
  56. // create an assembly to pipeParse an nginx log file and store on an HDFS cluster
  57. Pipe pipeParse = new Pipe("pipeParse");
  58. pipeParse = new ParseNginxLogSubAssembly(pipeParse);
  59.  
  60. // count requests by browser id
  61. Pipe pipeReqsById = new Pipe("pipeReqsById", pipeParse);
  62. pipeReqsById = new RequestsByBrowserIdSubAssembly(pipeReqsById);
  63.  
  64. // build the paths by browser id
  65. Pipe pipeUrlPathsByBrowserId = new Pipe("pipeUrlPathsByBrowserId", pipeParse);
  66. pipeUrlPathsByBrowserId = new UrlPathsByBrowserIdSubAssembly(pipeUrlPathsByBrowserId);
  67.  
  68. // find the most common paths
  69. Pipe pipeUrlPaths = new Pipe("pipeUrlPaths", pipeUrlPathsByBrowserId);
  70. pipeUrlPaths = new GroupBy(pipeUrlPaths, new Fields("path_length", "path"));
  71. pipeUrlPaths = new Every (pipeUrlPaths, new Count());
  72. pipeUrlPaths = new GroupBy(pipeUrlPaths, new Fields("count", "path_length", "path"));
  73. pipeUrlPaths = new Each (pipeUrlPaths, new Fields("count", "path_length", "path"), new Identity());
  74.  
  75. // build some traps for errant data
  76. Map<String,Tap> sources = new HashMap<String,Tap>();
  77. sources.put("nginxAccessLog", sourceNginxLogs);
  78.  
  79. // wire together pipes and sinks
  80. Map<String, Tap> sinks = Cascades.tapsMap(
  81. Pipe.pipes(
  82. pipeParse,
  83. pipeReqsById,
  84. pipeUrlPathsByBrowserId,
  85. pipeUrlPaths),
  86. Tap.taps(
  87. sinkLogs,
  88. sinkReqsById,
  89. sinkUrlPathsByBrowserId,
  90. sinkUrlPaths)
  91. );
  92.  
  93. // build some traps for errant data
  94. Map<String,Tap> traps = new HashMap<String,Tap>();
  95. traps.put("pipeParse", trapsTap);
  96.  
  97. System.out.println("SOURCES:");
  98. for(String key: sources.keySet())
  99. System.out.println("KEY: " + key + " --> " + sources.get(key));
  100.  
  101. System.out.println("SINKS:");
  102. for(String key: sinks.keySet())
  103. System.out.println("KEY: " + key + " --> " + sinks.get(key));
  104.  
  105. System.out.println("TRAPS:");
  106. for(String key: traps.keySet())
  107. System.out.println("KEY: " + key + " --> " + traps.get(key));
  108.  
  109. Flow flowNginx = flowConnector.connect(
  110. null,
  111. sources,
  112. sinks,
  113. traps,
  114. pipeParse,
  115. pipeReqsById,
  116. pipeUrlPathsByBrowserId,
  117. pipeUrlPaths);
  118. flowNginx.writeDOT("flowNginx.dot");
  119.  
  120. // connect the flows by their dependencies, order is not significant
  121. Cascade cascade = cascadeConnector.connect(flowNginx);
  122.  
  123. // execute the cascade, which in turn executes each flow in dependency order
  124. cascade.complete();
  125. }
Add Comment
Please, Sign In to add comment