Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.acheron.batch.config;
- import java.util.List;
- import javax.sql.DataSource;
- import org.springframework.batch.core.Job;
- import org.springframework.batch.core.Step;
- import
- org.springframework.batch.core.configuration.
- annotation.EnableBatchProcessing;
- import org.springframework.batch.core.
- configuration.annotation.JobBuilderFactory;
- import org.springframework.batch.
- core.configuration.annotation.StepBuilderFactory;
- import org.springframework.batch.
- core.configuration.annotation.StepScope;
- import org.springframework.batch.core.explore.JobExplorer;
- import org.springframework.batch.core.
- explore.support.MapJobExplorerFactoryBean;
- import org.springframework.batch.core.launch.support.RunIdIncrementer;
- import org.springframework.batch.core.launch.support.SimpleJobLauncher;
- import org.springframework.batch.core.repository.JobRepository;
- import org.springframework.batch.core.
- repository.support.MapJobRepositoryFactoryBean;
- import org.springframework.batch.item.ItemReader;
- import org.springframework.batch.item.support.IteratorItemReader;
- import org.springframework.batch.
- support.transaction.ResourcelessTransactionManager;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.env.Environment;
- import org.springframework.core.task.TaskExecutor;
- import org.springframework.dao.DeadlockLoserDataAccessException;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.transaction.PlatformTransactionManager;
- import com.acheron.batch.vo.Contents;
- import com.acheron.batch.vo.Schedules;
- @Configuration
- @EnableBatchProcessing
- public class BatchConfiguaration {
- @Autowired
- private DataSource datasouce;
- @Autowired
- private JobBuilderFactory jobBuilderFactory;
- @Autowired
- private StepBuilderFactory stepBuilderFactory;
- @Autowired
- public Environment env;
- @Bean(name = "reader")
- @StepScope
- public ItemReader<Schedules> reader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Schedules> scherecs) {
- ItemReader<Schedules> reader = new IteratorItemReader<Schedules>(scherecs);
- return reader;
- }
- @Bean(name = "CWSreader")
- @StepScope
- public ItemReader<Contents> CWSreader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Contents> scherecs) {
- ItemReader<Contents> reader = new IteratorItemReader<Contents>(scherecs);
- return reader;
- }
- @SuppressWarnings("rawtypes")
- @Bean
- @StepScope
- public BatchProcessor processor() {
- return new BatchProcessor();
- }
- @SuppressWarnings("rawtypes")
- @Bean
- @StepScope
- public ContentWorkflowBatchProcessor CWSprocessor() {
- return new ContentWorkflowBatchProcessor();
- }
- @Bean(name = "batchSchedulePreparedStatement")
- @StepScope
- public BatchSchedulePreparedStatement batchSchedulePreparedStatement() {
- return new BatchSchedulePreparedStatement();
- }
- @Bean(name = "contentWorkflowPreparedStatement")
- @StepScope
- public ContentWorkflowPreparedStatement contentWorkflowPreparedStatement() {
- return new ContentWorkflowPreparedStatement();
- }
- @Bean(name = "ContentWorkflowBatchWriter")
- @StepScope
- public ContentWorkflowBatchWriter ContentWorkflowBatchWriter() {
- return new ContentWorkflowBatchWriter();
- }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Bean(name = "batchWriter")
- @StepScope
- public BatchWriter batchWriter() {
- BatchWriter batchWriter = new BatchWriter();
- batchWriter.setDataSource(datasouce);
- batchWriter.setSql(env.getProperty("batch.insert.schedule.query"));
- batchWriter.setItemPreparedStatementSetter(batchSchedulePreparedStatement());
- return batchWriter;
- }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Bean(name = "CWSbatchWriter")
- @StepScope
- public ContentWorkflowBatchWriter CWSbatchWriter() {
- ContentWorkflowBatchWriter contentWorkflowBatchWriter = new ContentWorkflowBatchWriter();
- contentWorkflowBatchWriter.setDataSource(datasouce);
- contentWorkflowBatchWriter.setSql(env.getProperty("batch.insert.contentworkflow.query"));
- contentWorkflowBatchWriter.setItemPreparedStatementSetter(contentWorkflowPreparedStatement());
- return contentWorkflowBatchWriter;
- }
- @Bean("acheronDbTm")
- @Qualifier("acheronDbTm")
- public PlatformTransactionManager platformTransactionManager() {
- return new ResourcelessTransactionManager();
- }
- @Bean
- public JobExplorer jobExplorer() throws Exception {
- MapJobExplorerFactoryBean explorerFactoryBean = new MapJobExplorerFactoryBean();
- explorerFactoryBean.setRepositoryFactory(mapJobRepositoryFactoryBean());
- explorerFactoryBean.afterPropertiesSet();
- return explorerFactoryBean.getObject();
- }
- @Bean
- public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
- MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
- mapJobRepositoryFactoryBean.setTransactionManager(platformTransactionManager());
- return mapJobRepositoryFactoryBean;
- }
- @Bean
- public JobRepository jobRepository() throws Exception {
- return mapJobRepositoryFactoryBean().getObject();
- }
- @Bean
- public SimpleJobLauncher jobLauncher() throws Exception {
- SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
- jobLauncher.setJobRepository(jobRepository());
- return jobLauncher;
- }
- @Bean(name = "batchPartition")
- @StepScope
- public BatchPartition batchPartition() {
- BatchPartition batchPartition = new BatchPartition();
- return batchPartition;
- }
- @Bean(name = "contentWorkflowPartition")
- @StepScope
- public ContentWorkflowPartition ContentWorkflowPartition() {
- ContentWorkflowPartition contentWorkflowPartition = new ContentWorkflowPartition();
- return contentWorkflowPartition;
- }
- @Bean(name="taskExecutor")
- public TaskExecutor taskExecutor() {
- ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
- poolTaskExecutor.setCorePoolSize(10);
- poolTaskExecutor.setMaxPoolSize(30);
- poolTaskExecutor.setQueueCapacity(35);
- poolTaskExecutor.setThreadNamePrefix("Acheron");
- poolTaskExecutor.afterPropertiesSet();
- return poolTaskExecutor;
- }
- @Bean(name="CWSTaskExecutor")
- public TaskExecutor CWSTaskExecutor() {
- ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
- poolTaskExecutor.setCorePoolSize(10);
- poolTaskExecutor.setMaxPoolSize(30);
- poolTaskExecutor.setQueueCapacity(35);
- poolTaskExecutor.setThreadNamePrefix("Acheron");
- poolTaskExecutor.afterPropertiesSet();
- return poolTaskExecutor;
- }
- @Bean(name = "masterStep")
- public Step masterStep() {
- return stepBuilderFactory.get("masterStep").partitioner(slave()).partitioner("slave", batchPartition())
- .taskExecutor(taskExecutor()).build();
- }
- @Bean(name = "CWSmasterStep")
- public Step CWSmasterStep() {
- return stepBuilderFactory.get("CWSmasterStep").partitioner(CWSslave())
- .partitioner("CWSslave", ContentWorkflowPartition()).taskExecutor(CWSTaskExecutor()).build();
- }
- @Bean(name = "slave")
- public Step slave() {
- return stepBuilderFactory.get("slave").chunk(100).faultTolerant().retryLimit(2)
- .retry(DeadlockLoserDataAccessException.class).reader(reader(null)).processor(processor())
- .writer(batchWriter()).build();
- }
- @Bean(name = "CWSslave")
- public Step CWSslave() {
- return stepBuilderFactory.get("CWSslave").chunk(100).faultTolerant().retryLimit(2)
- .retry(DeadlockLoserDataAccessException.class).reader(CWSreader(null)).processor(processor())
- .writer(CWSbatchWriter()).build();
- }
- @Bean(name = "manageStagingScheduleMaster")
- public Job manageStagingScheduleMaster(final Step masterStep) throws Exception {
- return jobBuilderFactory.get("manageStagingScheduleMaster").preventRestart().incrementer(new RunIdIncrementer())
- .start(masterStep).build();
- @Bean(name = "manageContentWorkflowStaging")
- public Job manageContentWorkflowStaging(final Step CWSmasterStep) throws Exception {
- return jobBuilderFactory.get("manageContentWorkflowStaging").preventRestart()
- .incrementer(new RunIdIncrementer()).start(CWSmasterStep).build();
- }
- }`
- rest:
- api1: http://localhost:8888/v1/daq/schedule?sourceId=7&endDate=11/01/2018&page=1&pageSize=2
- api2: https://api.myjson.com/bins/1gm3ef
- ContentWorkflow: http://localhost:8085/v1/cwdata/programcontentgap?&
- responseCount: 10000
- contentWorkflowType: ContentWorkflow
- batch.insert.schedule.query: 'insert into clouddataflow.STAGING_SCHEDULE_MASTER (SCHEDULE_ID,SOURCE_ID,SOURCE_NAME,AIRING_TYPE_ID,AIRING_TYPE_NAME,CREATED_DATE,CREATED_USER,MODIFIED_DATE,MODIFIED_USER)values(?,?,?,?,?,?,?,?,?)'
- batch.insert.contentworkflow.query: 'insert into clouddataflow.CONTENT_WORKFLOW_STAGING (CONTENT_WORKFLOW_STAGING_ID,PROGRAM_ID,MASTER_TITLE,ORIGINAL_TITLE,PROGRAM_TYPE_ID,PROGRAM_TYPE_NAME,SOURCE_GROUP_ID,SOURCE_GROUP_NAME,REGION,PROGRAM_LANGUAGE,CW_STATUS,COPY_CULTURE,GENRES,KEYWORDS,AIR_DATE_TIME,IS_PROCESSED,CREATED_USER,CREATED_DATE,MODIFIED_USER,MODIFIED_DATE)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
- modifiedUser: admin
- threadCount: 10
- spring.acherondb.url: 'jdbc:mysql://localhost:3306/clouddataflow'
- spring.acherondb.uname: uname
- spring.acherondb.pwd: pwd
- spring.acherondb.dclassName: com.mysql.jdbc.Driver
- spring.acherondb.schema: clouddataflow
- spring.acherondb.showSql: true
- spring.jpa.hibernate.naming.physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
- spring:
- batch:
- job:
- enabled: true
- datasource:
- type: com.zaxxer.hikari.HikariDataSource
- hikari:
- jdbc-url: jdbc:mysql://localhost:3306/clouddataflow
- username: uname
- password: pwd
- driver-class-name: com.mysql.jdbc.Driver
- connection-timeout: 300
- idle-timeout: 30
- minimum-idle: 10
- maximum-pool-size: 30
- logging:
- level:
- com.acheron: ERROR
- pattern:
- file: '%d{yyyy-MM-dd HH:mm:ss} {%thread} %-5level %logger{36} -%msg%n'
- file: application
- path: ./logs
- queueScheduleData:
- createUser: User1
- status: Success
- modifiedUser: User1
- status:
- success: Success
- completed: COMPLETED
Add Comment
Please, Sign In to add comment