Guest User

Untitled

a guest
Jul 19th, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.83 KB | None | 0 0
  1. @Configuration
  2. public class JobConfiguration {
  3.  
  4. ...
  5. ...
  6. ...
  7.  
  8. @Bean
  9. public JdbcCursorItemReader<Notification> reader() {
  10. JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
  11. reader.setDataSource(dataSource);
  12.  
  13. ...
  14. ...
  15. reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
  16. return reader;
  17. }
  18.  
  19. @Bean
  20. public NotificationItemProcessor notificatonProcessor() {
  21. return new NotificationItemProcessor();
  22. }
  23.  
  24. @Bean
  25. public JdbcBatchItemWriter<Notification> updateWriter() {
  26. JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
  27. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
  28. ...
  29. writer.setDataSource(dataSource);
  30. return writer;
  31. }
  32.  
  33.  
  34. /**
  35. * Composite Exchange Writer
  36. * @return
  37. * @throws InstantiationException
  38. * @throws IllegalAccessException
  39. */
  40. @SuppressWarnings("unchecked")
  41. @Bean
  42. public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
  43. HashMap<String, Object> map = new HashMap<String, Object>();
  44. map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
  45. map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
  46. map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
  47. return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
  48. }
  49.  
  50. @Bean
  51. public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
  52. JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
  53. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
  54.  
  55. writer.setSql(" INSERT INTO SOME TABLE..");
  56.  
  57. writer.setDataSource(dataSource);
  58. return writer;
  59. }
  60.  
  61. @Bean
  62. public JdbcBatchItemWriter<Notification> exchangeWriter() {
  63. JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
  64. writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
  65.  
  66. writer.setSql("INSERT INTO SOME OTHER TABLE.");
  67.  
  68. writer.setDataSource(dataSource);
  69. return writer;
  70. }
  71.  
  72. @Bean
  73. public ItemWriter<Document> doNothing() {
  74. return new DummyWriter();
  75. }
  76.  
  77. @Bean
  78. public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
  79. return jobBuilderFactory.get("generatePdf")
  80. .incrementer(new RunIdIncrementer())
  81. .flow(treatStock())
  82. .end()
  83. .build();
  84. }
  85.  
  86. @Bean
  87. public Step treatStock() throws InstantiationException, IllegalAccessException {
  88. return stepBuilderFactory.get("treatStock")
  89. .<Notification, Notification>chunk(1)
  90. .reader(reader())
  91. .processor(notificatonProcessor())
  92. .writer(compositeExchangeWriter())
  93. .writer(updateWriter())
  94. .build();
  95. }
  96.  
  97. }
  98.  
  99. public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {
  100.  
  101. public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
  102. BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
  103. classif.setRouterDelegate(routerDelegate.newInstance());
  104. classif.setMatcherMap(matcherMap);
  105.  
  106. ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
  107. classifier.setClassifier(classif);
  108.  
  109. this.delegates.add(classifier);
  110.  
  111. }
  112.  
  113. public CompositeItemWriterBuilder(List<Object> delegates) {
  114. this.delegates = delegates;
  115. }
  116.  
  117. @Override
  118. protected Class<?> getCompositeItem() {
  119. return CompositeItemWriter.class;
  120. }
  121. }
  122.  
  123. public abstract class CompositeItemBuilder<T> {
  124.  
  125. protected List<Object> delegates = new ArrayList<Object>();
  126.  
  127. @SuppressWarnings("unchecked")
  128. public T build() throws InstantiationException, IllegalAccessException {
  129.  
  130. Object compositeItem = getCompositeItem().newInstance();
  131. Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
  132. ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
  133.  
  134. return (T) compositeItem;
  135. }
  136.  
  137. abstract protected Class<?> getCompositeItem();
  138. }
  139.  
  140. public class ExchangeWriterRouterClassifier {
  141.  
  142. @Classifier
  143. public String classify(Notification notification) {
  144. return notification.getExchangesWorkflow().getRouter().getActionName();
  145. }
  146.  
  147. }
Add Comment
Please, Sign In to add comment