Advertisement
giancarloparma

DataLoaderBatchConfiguration

Dec 6th, 2018
2,357
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.50 KB | None | 0 0
  1. package it.batch.dataloader;
  2.  
  3. import javax.sql.DataSource;
  4.  
  5. import org.apache.log4j.Logger;
  6. import org.springframework.batch.core.BatchStatus;
  7. import org.springframework.batch.core.Job;
  8. import org.springframework.batch.core.JobExecution;
  9. import org.springframework.batch.core.JobParameter;
  10. import org.springframework.batch.core.Step;
  11. import org.springframework.batch.core.configuration.JobRegistry;
  12. import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
  13. import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
  14. import org.springframework.batch.core.configuration.support.MapJobRegistry;
  15. import org.springframework.batch.core.configuration.support.ReferenceJobFactory;
  16. import org.springframework.batch.core.explore.JobExplorer;
  17. import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
  18. import org.springframework.batch.core.job.builder.JobBuilder;
  19. import org.springframework.batch.core.job.builder.SimpleJobBuilder;
  20. import org.springframework.batch.core.launch.support.SimpleJobOperator;
  21. import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao;
  22. import org.springframework.batch.item.ItemProcessor;
  23. import org.springframework.batch.item.ItemReader;
  24. import org.springframework.batch.item.ItemWriter;
  25. import org.springframework.batch.item.file.FlatFileItemReader;
  26. import org.springframework.batch.item.file.LineMapper;
  27. import org.springframework.beans.factory.annotation.Value;
  28. import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
  29. import org.springframework.context.annotation.Bean;
  30. import org.springframework.context.annotation.Configuration;
  31. import org.springframework.core.io.FileSystemResource;
  32. import org.springframework.jdbc.core.JdbcTemplate;
  33.  
  34. @Configuration
  35. @EnableBatchProcessing
  36. @EnableAutoConfiguration
  37. public class DataLoaderBatchConfiguration extends AbstractBatchConfiguration {
  38.     private static final Logger LOG = Logger.getLogger(DataLoaderBatchConfiguration.class);
  39.    
  40.     private static final String JOB_NAME = JobName.job20.name();
  41.    
  42.     @Value("${batch.file.dir.input}")
  43.     private String inputDir;
  44.    
  45.     @Value("${batch.file.dir.input.archive}")
  46.     private String archiveInputDir;
  47.    
  48.     @Value("${batch.file.name.prefix}")
  49.     private String fileNamePrefix;
  50.    
  51.     @Value("${batch.file.type}")
  52.     private String fileType;
  53.    
  54.     private JobExplorer jobExplorer;
  55.    
  56.     private JobRegistry jobRegistry;
  57.    
  58.     protected Step stepLoadFile() {
  59.         return stepBuilderFactory().get(StepName.step1.name()).
  60.                 <RecordLine, RecordLine>chunk(commitInterval.intValue()).
  61.                     reader(readerForStepLoadFile()).
  62.                     processor(processorForStepLoadFile()).
  63.                     writer(writerForStepLoadFile()).build();
  64.     }
  65.    
  66.     private ItemWriter<FileRecordLine> writerForStepLoadFile() {
  67.         return new LoadFileItemWriter<FileRecordLine>(dataSource);
  68.     }
  69.  
  70.     private ItemProcessor<FileRecordLine, FileRecordLine> processorForStepLoadFile() {
  71.         return new LoadFileItemProcessor(dataSource);
  72.     }
  73.  
  74.     private ItemReader<FileRecordLine> readerForStepLoadFile() {
  75.         FlatFileItemReader<FileRecordLine> singleReader = new FlatFileItemReader<FileRecordLine>();
  76.         singleReader.setLineMapper(new LineMapper<FileRecordLine>() {
  77.             @Override
  78.             public FileRecordLine mapLine(String line, int lineNumber) throws Exception {
  79.                 return new FileRecordLine(line, lineNumber);
  80.             }
  81.         });
  82.        
  83.         singleReader.setResource(new FileSystemResource(FileUtil.filesForDir(inputDir, fileNamePrefix)[0]));
  84.        
  85.         return singleReader;
  86.     }
  87.    
  88.     protected Job job() throws Exception {
  89.         JobBuilder jobBuilder = jobBuilderFactory().get(JOB_NAME);
  90.         Map<StepName, Step> stepMap = new LinkedHashMap<StepName, Step>();
  91.        
  92.         Step step = stepLoadFile();
  93.         stepMap.put(StepName.valueOf(step.getName()), step);
  94.        
  95.         SimpleJobBuilder simpleJobBuilder = StepSequenceManager.simpleStepSequence(jobBuilder, stepSequence, stepMap);
  96.         return simpleJobBuilder.build();
  97.     }
  98.    
  99.     @Bean
  100.     public JobExecution jobExecution() throws Exception {
  101.         Map<String, JobParameter> jobParametersMap = new HashMap<String, JobParameter>();
  102.         jobParametersMap.put("inputDir", new JobParameter(inputDir, false));
  103.         jobParametersMap.put("archiveInputDir", new JobParameter(archiveInputDir, false));
  104.         jobParametersMap.put("fileNamePrefix", new JobParameter(fileNamePrefix, false));
  105.         jobParametersMap.put("fileType", new JobParameter(fileType, false));
  106.        
  107.         JobExecution run = null;
  108.        
  109.         Job job = job();
  110.         ReferenceJobFactory referenceJobFactory = new ReferenceJobFactory(job);
  111.         jobRegistry.register(referenceJobFactory);
  112.        
  113.         SimpleJobOperator jobOperator = new SimpleJobOperator();
  114.         jobOperator.setJobLauncher(jobLauncher);
  115.         jobOperator.setJobRepository(jobRepository);
  116.         jobOperator.setJobRegistry(jobRegistry);
  117.         jobOperator.setJobExplorer(jobExplorer);
  118.        
  119.         List<Long> jobInstances = jobOperator.getJobInstances(JOB_NAME, 0, 10);
  120.         if (!jobInstances.isEmpty()) {
  121.             Long instanceId = jobInstances.get(0);
  122.             List<Long> executions = jobOperator.getExecutions(instanceId);
  123.             if (!executions.isEmpty()) {
  124.                 Long executionId = executions.get(0);
  125.                 JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
  126.                 if (jobExecution.getStatus().equals(BatchStatus.FAILED)) {
  127.                     try {
  128.                         Long restartId = jobOperator.restart(executionId);
  129.                         run = jobExplorer.getJobExecution(restartId);
  130.                     } catch (Exception e) {
  131.                         LOG.error("Error resuming job " + executionId + ", a new job instance will be created. Cause: " + e.getLocalizedMessage());
  132.                     }
  133.                 }
  134.             }
  135.         }
  136.        
  137.         if (run == null) {
  138.             run = jobLauncher.run(job, jobParameters(jobParametersMap));
  139.         }
  140.        
  141.         return addToExecutionExitCodeGenerator(run);
  142.     }
  143.    
  144.     @Bean
  145.     public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
  146.         final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
  147.         bean.setDataSource(dataSource);
  148.         bean.setTablePrefix(AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX);
  149.         bean.setJdbcOperations(new JdbcTemplate(dataSource));
  150.         bean.afterPropertiesSet();
  151.         return this.jobExplorer = bean.getObject();
  152.     }
  153.    
  154.     @Bean
  155.     public JobRegistry jobRegistry() throws Exception {
  156.             return jobRegistry = new MapJobRegistry();
  157.     }
  158.    
  159.     @Bean
  160.     public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
  161.         JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
  162.         jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
  163.         return jobRegistryBeanPostProcessor;
  164.     }
  165.    
  166. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement