Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.crunch.MapFn;
- import org.apache.crunch.PCollection;
- import org.apache.crunch.Pipeline;
- import org.apache.crunch.PipelineExecution;
- import org.apache.crunch.impl.mr.MRPipeline;
- import org.apache.crunch.types.writable.Writables;
- public class FailTest {
- public static void main(String[] args) {
- Pipeline pipeline = new MRPipeline(FailTest.class);
- PCollection<String> p = pipeline.readTextFile("/user/jgauci/inputs");
- PCollection<Integer> result = p.parallelDo(new MapFn<String, Integer>() {
- @Override
- public Integer map(String input) {
- int c = 0;
- return 1 / c;
- }
- }, Writables.ints());
- Iterable<Integer> localResult = result.materialize();
- PipelineExecution execution = pipeline.runAsync();
- while (!execution.isDone() && !execution.isCancelled()
- && execution.getStatus() != PipelineExecution.Status.FAILED
- && execution.getResult() == null) {
- try {
- Thread.sleep(1000);
- System.out.println("Job Status: " + execution.getStatus().toString());
- } catch (InterruptedException e) {
- System.err.println("ABORTING");
- e.printStackTrace();
- try {
- execution.kill();
- execution.waitUntilDone();
- } catch (InterruptedException e1) {
- throw new RuntimeException(e1);
- }
- throw new RuntimeException(e);
- }
- }
- System.out.println("Finished running job.");
- if (execution.isCancelled()) {
- throw new RuntimeException("Job failed");
- }
- for (Integer d : localResult) {
- System.out.println(d);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement