Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package it.batch.dataloader;
- import javax.sql.DataSource;
- import org.apache.log4j.Logger;
- import org.springframework.batch.core.BatchStatus;
- import org.springframework.batch.core.Job;
- import org.springframework.batch.core.JobExecution;
- import org.springframework.batch.core.JobParameter;
- import org.springframework.batch.core.Step;
- import org.springframework.batch.core.configuration.JobRegistry;
- import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
- import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
- import org.springframework.batch.core.configuration.support.MapJobRegistry;
- import org.springframework.batch.core.configuration.support.ReferenceJobFactory;
- import org.springframework.batch.core.explore.JobExplorer;
- import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
- import org.springframework.batch.core.job.builder.JobBuilder;
- import org.springframework.batch.core.job.builder.SimpleJobBuilder;
- import org.springframework.batch.core.launch.support.SimpleJobOperator;
- import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao;
- import org.springframework.batch.item.ItemProcessor;
- import org.springframework.batch.item.ItemReader;
- import org.springframework.batch.item.ItemWriter;
- import org.springframework.batch.item.file.FlatFileItemReader;
- import org.springframework.batch.item.file.LineMapper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.io.FileSystemResource;
- import org.springframework.jdbc.core.JdbcTemplate;
- @Configuration
- @EnableBatchProcessing
- @EnableAutoConfiguration
- public class DataLoaderBatchConfiguration extends AbstractBatchConfiguration {
- private static final Logger LOG = Logger.getLogger(DataLoaderBatchConfiguration.class);
- private static final String JOB_NAME = JobName.job20.name();
- @Value("${batch.file.dir.input}")
- private String inputDir;
- @Value("${batch.file.dir.input.archive}")
- private String archiveInputDir;
- @Value("${batch.file.name.prefix}")
- private String fileNamePrefix;
- @Value("${batch.file.type}")
- private String fileType;
- private JobExplorer jobExplorer;
- private JobRegistry jobRegistry;
- protected Step stepLoadFile() {
- return stepBuilderFactory().get(StepName.step1.name()).
- <RecordLine, RecordLine>chunk(commitInterval.intValue()).
- reader(readerForStepLoadFile()).
- processor(processorForStepLoadFile()).
- writer(writerForStepLoadFile()).build();
- }
- private ItemWriter<FileRecordLine> writerForStepLoadFile() {
- return new LoadFileItemWriter<FileRecordLine>(dataSource);
- }
- private ItemProcessor<FileRecordLine, FileRecordLine> processorForStepLoadFile() {
- return new LoadFileItemProcessor(dataSource);
- }
- private ItemReader<FileRecordLine> readerForStepLoadFile() {
- FlatFileItemReader<FileRecordLine> singleReader = new FlatFileItemReader<FileRecordLine>();
- singleReader.setLineMapper(new LineMapper<FileRecordLine>() {
- @Override
- public FileRecordLine mapLine(String line, int lineNumber) throws Exception {
- return new FileRecordLine(line, lineNumber);
- }
- });
- singleReader.setResource(new FileSystemResource(FileUtil.filesForDir(inputDir, fileNamePrefix)[0]));
- return singleReader;
- }
- protected Job job() throws Exception {
- JobBuilder jobBuilder = jobBuilderFactory().get(JOB_NAME);
- Map<StepName, Step> stepMap = new LinkedHashMap<StepName, Step>();
- Step step = stepLoadFile();
- stepMap.put(StepName.valueOf(step.getName()), step);
- SimpleJobBuilder simpleJobBuilder = StepSequenceManager.simpleStepSequence(jobBuilder, stepSequence, stepMap);
- return simpleJobBuilder.build();
- }
- @Bean
- public JobExecution jobExecution() throws Exception {
- Map<String, JobParameter> jobParametersMap = new HashMap<String, JobParameter>();
- jobParametersMap.put("inputDir", new JobParameter(inputDir, false));
- jobParametersMap.put("archiveInputDir", new JobParameter(archiveInputDir, false));
- jobParametersMap.put("fileNamePrefix", new JobParameter(fileNamePrefix, false));
- jobParametersMap.put("fileType", new JobParameter(fileType, false));
- JobExecution run = null;
- Job job = job();
- ReferenceJobFactory referenceJobFactory = new ReferenceJobFactory(job);
- jobRegistry.register(referenceJobFactory);
- SimpleJobOperator jobOperator = new SimpleJobOperator();
- jobOperator.setJobLauncher(jobLauncher);
- jobOperator.setJobRepository(jobRepository);
- jobOperator.setJobRegistry(jobRegistry);
- jobOperator.setJobExplorer(jobExplorer);
- List<Long> jobInstances = jobOperator.getJobInstances(JOB_NAME, 0, 10);
- if (!jobInstances.isEmpty()) {
- Long instanceId = jobInstances.get(0);
- List<Long> executions = jobOperator.getExecutions(instanceId);
- if (!executions.isEmpty()) {
- Long executionId = executions.get(0);
- JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
- if (jobExecution.getStatus().equals(BatchStatus.FAILED)) {
- try {
- Long restartId = jobOperator.restart(executionId);
- run = jobExplorer.getJobExecution(restartId);
- } catch (Exception e) {
- LOG.error("Error resuming job " + executionId + ", a new job instance will be created. Cause: " + e.getLocalizedMessage());
- }
- }
- }
- }
- if (run == null) {
- run = jobLauncher.run(job, jobParameters(jobParametersMap));
- }
- return addToExecutionExitCodeGenerator(run);
- }
- @Bean
- public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
- final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
- bean.setDataSource(dataSource);
- bean.setTablePrefix(AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX);
- bean.setJdbcOperations(new JdbcTemplate(dataSource));
- bean.afterPropertiesSet();
- return this.jobExplorer = bean.getObject();
- }
- @Bean
- public JobRegistry jobRegistry() throws Exception {
- return jobRegistry = new MapJobRegistry();
- }
- @Bean
- public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
- JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
- jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
- return jobRegistryBeanPostProcessor;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement