Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from("quartz://timerName?trigger.repeatCount=0&trigger.repeatInterval=0")
- .setHeader(Constants.BATCH_CORRELATION_ID, new SimpleExpression("CORRELATION"))
- .setHeader(Constants.LAST_FILE, new ConstantExpression("TRUE"))
- .to("direct:aggregate");
- from("file:///temp/src/data/process?delete=false&move=success&moveFailed=error")
- .setHeader(Constants.BATCH_CORRELATION_ID, new SimpleExpression("CORRELATION"))
- .convertBodyTo(InputStream.class)
- .to("direct:aggregate");
- from("direct:aggregate")
- .aggregate(header(Constants.BATCH_CORRELATION_ID), new GroupedExchangeAggregationStrategy())
- .eagerCheckCompletion()
- .completionPredicate(header(Constants.LAST_FILE).isEqualTo("TRUE"))
- .processRef("fileProcessor");
- onException(ProcessingException.class).processRef("myError")
- .to("file:///temp/src/data/process/error");
- public void process(Exchange exch) throws Exception {
- Message m = exch.getIn();
- List<Exchange> ge = exch.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
- for (Exchange e : ge) {
- String HostfileType = (String) e.getIn().getHeader(Constants.HOST_FILETYPE);
- InputStream fis = e.getIn().getBody(InputStream.class);
- if (e.getIn().getBody()!=null) {
- // do something here
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement