Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Regex: ^([^ ]*) +([^ ]*) +([^ ]*) +\[([^\]]*)\] +([^ ]*) +([^ ]*) +([^ ]*) +\"([^\"]*)\" +([^ ]*) +\"([^\"]*)\" +\"([^\"]*)\" +\"([^\"]*)\" +\(([^\)]*)\) +\"([^\"]*)\".*$
- SOURCES:
- KEY: nginxAccessLog --> Lfs["TextLine[['offset', 'line']->[ALL]]"]["../data/scribd-access-log-small"]"]
- SINKS:
- KEY: pipeUrlPathsByBrowserId --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/url_paths_by_browser_id/"]"]
- KEY: pipeReqsById --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/reqs_by_ip/"]"]
- KEY: pipeUrlPaths --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/url_paths/"]"]
- KEY: pipeParse --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/logs/"]"]
- TRAPS:
- KEY: pipeParse --> Hfs["TextLine[['offset', 'line']->[ALL]]"]["output/traps/"]"]
- 09/12/07 15:16:27 INFO flow.MultiMapReducePlanner: using application jar: /Users/ijcd/work/hadoop/nginxlog/./build/nginxlogparser.jar
- Exception in thread "main" cascading.flow.PlannerException: not all head pipes bound to source taps, remaining head pipe names: [pipeParse], remaining sources: [nginxAccessLog]
- at cascading.flow.FlowPlanner.verifyPipeAssemblyEndPoints(FlowPlanner.java:204)
- at cascading.flow.FlowPlanner.verifyAssembly(FlowPlanner.java:71)
- at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:169)
- at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
- at nginxlogparser.Main.main(Unknown Source)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
- at java.lang.reflect.Method.invoke(Method.java:597)
- at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
- Code follows:
- // TODO: parse args and have jobs be selectable by an arg
- public static void main(String[] args) throws UnsupportedEncodingException {
- String inputPath = args[0];
- String trapsPath = args[1] + "/traps/";
- String logsPath = args[1] + "/logs/";
- String reqsPath = args[1] + "/reqs_by_ip/";
- String urlPathsByBrowserId = args[1] + "/url_paths_by_browser_id/";
- String urlPathsPath = args[1] + "/url_paths/";
- // set the current job jar
- Properties properties = new Properties();
- FlowConnector.setApplicationJarClass(properties, Main.class);
- // connectors
- FlowConnector flowConnector = new FlowConnector(properties);
- CascadeConnector cascadeConnector = new CascadeConnector();
- // source
- Tap sourceNginxLogs = new Lfs(new TextLine(), inputPath);
- // sinks
- Tap sinkLogs = new Hfs(new TextLine(), logsPath, SinkMode.REPLACE);
- Tap sinkReqsById = new Hfs(new TextLine(), reqsPath, SinkMode.REPLACE);
- Tap sinkUrlPathsByBrowserId = new Hfs(new TextLine(), urlPathsByBrowserId, SinkMode.REPLACE);
- Tap sinkUrlPaths = new Hfs(new TextLine(), urlPathsPath, SinkMode.REPLACE);
- Tap trapsTap = new Hfs(new TextLine(), trapsPath, SinkMode.REPLACE);
- // create an assembly to pipeParse an nginx log file and store on an HDFS cluster
- Pipe pipeParse = new Pipe("pipeParse");
- pipeParse = new ParseNginxLogSubAssembly(pipeParse);
- // count requests by browser id
- Pipe pipeReqsById = new Pipe("pipeReqsById", pipeParse);
- pipeReqsById = new RequestsByBrowserIdSubAssembly(pipeReqsById);
- // build the paths by browser id
- Pipe pipeUrlPathsByBrowserId = new Pipe("pipeUrlPathsByBrowserId", pipeParse);
- pipeUrlPathsByBrowserId = new UrlPathsByBrowserIdSubAssembly(pipeUrlPathsByBrowserId);
- // find the most common paths
- Pipe pipeUrlPaths = new Pipe("pipeUrlPaths", pipeUrlPathsByBrowserId);
- pipeUrlPaths = new GroupBy(pipeUrlPaths, new Fields("path_length", "path"));
- pipeUrlPaths = new Every (pipeUrlPaths, new Count());
- pipeUrlPaths = new GroupBy(pipeUrlPaths, new Fields("count", "path_length", "path"));
- pipeUrlPaths = new Each (pipeUrlPaths, new Fields("count", "path_length", "path"), new Identity());
- // build some traps for errant data
- Map<String,Tap> sources = new HashMap<String,Tap>();
- sources.put("nginxAccessLog", sourceNginxLogs);
- // wire together pipes and sinks
- Map<String, Tap> sinks = Cascades.tapsMap(
- Pipe.pipes(
- pipeParse,
- pipeReqsById,
- pipeUrlPathsByBrowserId,
- pipeUrlPaths),
- Tap.taps(
- sinkLogs,
- sinkReqsById,
- sinkUrlPathsByBrowserId,
- sinkUrlPaths)
- );
- // build some traps for errant data
- Map<String,Tap> traps = new HashMap<String,Tap>();
- traps.put("pipeParse", trapsTap);
- System.out.println("SOURCES:");
- for(String key: sources.keySet())
- System.out.println("KEY: " + key + " --> " + sources.get(key));
- System.out.println("SINKS:");
- for(String key: sinks.keySet())
- System.out.println("KEY: " + key + " --> " + sinks.get(key));
- System.out.println("TRAPS:");
- for(String key: traps.keySet())
- System.out.println("KEY: " + key + " --> " + traps.get(key));
- Flow flowNginx = flowConnector.connect(
- null,
- sources,
- sinks,
- traps,
- pipeParse,
- pipeReqsById,
- pipeUrlPathsByBrowserId,
- pipeUrlPaths);
- flowNginx.writeDOT("flowNginx.dot");
- // connect the flows by their dependencies, order is not significant
- Cascade cascade = cascadeConnector.connect(flowNginx);
- // execute the cascade, which in turn executes each flow in dependency order
- cascade.complete();
- }
Add Comment
Please, Sign In to add comment