Guest User

Untitled

a guest
Dec 28th, 2017
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.18 KB | None | 0 0
  1. package com.acheron.batch.config;
  2.  
  3. import java.util.List;
  4.  
  5. import javax.sql.DataSource;
  6.  
  7. import org.springframework.batch.core.Job;
  8. import org.springframework.batch.core.Step;
  9. import
  10. org.springframework.batch.core.configuration.
  11. annotation.EnableBatchProcessing;
  12. import org.springframework.batch.core.
  13. configuration.annotation.JobBuilderFactory;
  14. import org.springframework.batch.
  15. core.configuration.annotation.StepBuilderFactory;
  16. import org.springframework.batch.
  17. core.configuration.annotation.StepScope;
  18. import org.springframework.batch.core.explore.JobExplorer;
  19. import org.springframework.batch.core.
  20. explore.support.MapJobExplorerFactoryBean;
  21. import org.springframework.batch.core.launch.support.RunIdIncrementer;
  22. import org.springframework.batch.core.launch.support.SimpleJobLauncher;
  23. import org.springframework.batch.core.repository.JobRepository;
  24. import org.springframework.batch.core.
  25. repository.support.MapJobRepositoryFactoryBean;
  26. import org.springframework.batch.item.ItemReader;
  27. import org.springframework.batch.item.support.IteratorItemReader;
  28. import org.springframework.batch.
  29. support.transaction.ResourcelessTransactionManager;
  30. import org.springframework.beans.factory.annotation.Autowired;
  31. import org.springframework.beans.factory.annotation.Qualifier;
  32. import org.springframework.beans.factory.annotation.Value;
  33. import org.springframework.context.annotation.Bean;
  34. import org.springframework.context.annotation.Configuration;
  35. import org.springframework.core.env.Environment;
  36. import org.springframework.core.task.TaskExecutor;
  37. import org.springframework.dao.DeadlockLoserDataAccessException;
  38. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  39. import org.springframework.transaction.PlatformTransactionManager;
  40. import com.acheron.batch.vo.Contents;
  41. import com.acheron.batch.vo.Schedules;
  42.  
  43. @Configuration
  44. @EnableBatchProcessing
  45.  
  46. public class BatchConfiguaration {
  47. @Autowired
  48. private DataSource datasouce;
  49.  
  50. @Autowired
  51. private JobBuilderFactory jobBuilderFactory;
  52.  
  53. @Autowired
  54. private StepBuilderFactory stepBuilderFactory;
  55.  
  56. @Autowired
  57. public Environment env;
  58.  
  59. @Bean(name = "reader")
  60. @StepScope
  61. public ItemReader<Schedules> reader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Schedules> scherecs) {
  62. ItemReader<Schedules> reader = new IteratorItemReader<Schedules>(scherecs);
  63. return reader;
  64. }
  65.  
  66. @Bean(name = "CWSreader")
  67. @StepScope
  68. public ItemReader<Contents> CWSreader(@Value("#{stepExecutionContext[scheduleRecs]}") List<Contents> scherecs) {
  69. ItemReader<Contents> reader = new IteratorItemReader<Contents>(scherecs);
  70. return reader;
  71. }
  72.  
  73. @SuppressWarnings("rawtypes")
  74. @Bean
  75. @StepScope
  76. public BatchProcessor processor() {
  77. return new BatchProcessor();
  78. }
  79.  
  80. @SuppressWarnings("rawtypes")
  81. @Bean
  82. @StepScope
  83. public ContentWorkflowBatchProcessor CWSprocessor() {
  84. return new ContentWorkflowBatchProcessor();
  85. }
  86.  
  87.  
  88. @Bean(name = "batchSchedulePreparedStatement")
  89. @StepScope
  90. public BatchSchedulePreparedStatement batchSchedulePreparedStatement() {
  91. return new BatchSchedulePreparedStatement();
  92. }
  93.  
  94. @Bean(name = "contentWorkflowPreparedStatement")
  95. @StepScope
  96. public ContentWorkflowPreparedStatement contentWorkflowPreparedStatement() {
  97. return new ContentWorkflowPreparedStatement();
  98. }
  99.  
  100. @Bean(name = "ContentWorkflowBatchWriter")
  101. @StepScope
  102. public ContentWorkflowBatchWriter ContentWorkflowBatchWriter() {
  103. return new ContentWorkflowBatchWriter();
  104. }
  105.  
  106. @SuppressWarnings({ "rawtypes", "unchecked" })
  107. @Bean(name = "batchWriter")
  108. @StepScope
  109. public BatchWriter batchWriter() {
  110. BatchWriter batchWriter = new BatchWriter();
  111. batchWriter.setDataSource(datasouce);
  112. batchWriter.setSql(env.getProperty("batch.insert.schedule.query"));
  113. batchWriter.setItemPreparedStatementSetter(batchSchedulePreparedStatement());
  114. return batchWriter;
  115.  
  116. }
  117.  
  118. @SuppressWarnings({ "rawtypes", "unchecked" })
  119. @Bean(name = "CWSbatchWriter")
  120. @StepScope
  121. public ContentWorkflowBatchWriter CWSbatchWriter() {
  122. ContentWorkflowBatchWriter contentWorkflowBatchWriter = new ContentWorkflowBatchWriter();
  123. contentWorkflowBatchWriter.setDataSource(datasouce);
  124. contentWorkflowBatchWriter.setSql(env.getProperty("batch.insert.contentworkflow.query"));
  125. contentWorkflowBatchWriter.setItemPreparedStatementSetter(contentWorkflowPreparedStatement());
  126. return contentWorkflowBatchWriter;
  127.  
  128. }
  129.  
  130. @Bean("acheronDbTm")
  131. @Qualifier("acheronDbTm")
  132. public PlatformTransactionManager platformTransactionManager() {
  133. return new ResourcelessTransactionManager();
  134. }
  135.  
  136. @Bean
  137. public JobExplorer jobExplorer() throws Exception {
  138. MapJobExplorerFactoryBean explorerFactoryBean = new MapJobExplorerFactoryBean();
  139. explorerFactoryBean.setRepositoryFactory(mapJobRepositoryFactoryBean());
  140. explorerFactoryBean.afterPropertiesSet();
  141. return explorerFactoryBean.getObject();
  142. }
  143.  
  144. @Bean
  145. public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean() {
  146. MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
  147. mapJobRepositoryFactoryBean.setTransactionManager(platformTransactionManager());
  148. return mapJobRepositoryFactoryBean;
  149. }
  150.  
  151. @Bean
  152. public JobRepository jobRepository() throws Exception {
  153. return mapJobRepositoryFactoryBean().getObject();
  154. }
  155.  
  156. @Bean
  157. public SimpleJobLauncher jobLauncher() throws Exception {
  158. SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
  159. jobLauncher.setJobRepository(jobRepository());
  160. return jobLauncher;
  161. }
  162.  
  163. @Bean(name = "batchPartition")
  164. @StepScope
  165. public BatchPartition batchPartition() {
  166. BatchPartition batchPartition = new BatchPartition();
  167. return batchPartition;
  168. }
  169.  
  170. @Bean(name = "contentWorkflowPartition")
  171. @StepScope
  172. public ContentWorkflowPartition ContentWorkflowPartition() {
  173. ContentWorkflowPartition contentWorkflowPartition = new ContentWorkflowPartition();
  174. return contentWorkflowPartition;
  175. }
  176.  
  177. @Bean(name="taskExecutor")
  178. public TaskExecutor taskExecutor() {
  179. ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  180. poolTaskExecutor.setCorePoolSize(10);
  181. poolTaskExecutor.setMaxPoolSize(30);
  182. poolTaskExecutor.setQueueCapacity(35);
  183. poolTaskExecutor.setThreadNamePrefix("Acheron");
  184. poolTaskExecutor.afterPropertiesSet();
  185. return poolTaskExecutor;
  186. }
  187.  
  188. @Bean(name="CWSTaskExecutor")
  189. public TaskExecutor CWSTaskExecutor() {
  190. ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  191. poolTaskExecutor.setCorePoolSize(10);
  192. poolTaskExecutor.setMaxPoolSize(30);
  193. poolTaskExecutor.setQueueCapacity(35);
  194. poolTaskExecutor.setThreadNamePrefix("Acheron");
  195. poolTaskExecutor.afterPropertiesSet();
  196. return poolTaskExecutor;
  197. }
  198.  
  199. @Bean(name = "masterStep")
  200. public Step masterStep() {
  201. return stepBuilderFactory.get("masterStep").partitioner(slave()).partitioner("slave", batchPartition())
  202. .taskExecutor(taskExecutor()).build();
  203. }
  204.  
  205. @Bean(name = "CWSmasterStep")
  206. public Step CWSmasterStep() {
  207. return stepBuilderFactory.get("CWSmasterStep").partitioner(CWSslave())
  208. .partitioner("CWSslave", ContentWorkflowPartition()).taskExecutor(CWSTaskExecutor()).build();
  209. }
  210.  
  211. @Bean(name = "slave")
  212. public Step slave() {
  213. return stepBuilderFactory.get("slave").chunk(100).faultTolerant().retryLimit(2)
  214. .retry(DeadlockLoserDataAccessException.class).reader(reader(null)).processor(processor())
  215. .writer(batchWriter()).build();
  216.  
  217. }
  218.  
  219. @Bean(name = "CWSslave")
  220. public Step CWSslave() {
  221. return stepBuilderFactory.get("CWSslave").chunk(100).faultTolerant().retryLimit(2)
  222. .retry(DeadlockLoserDataAccessException.class).reader(CWSreader(null)).processor(processor())
  223. .writer(CWSbatchWriter()).build();
  224.  
  225. }
  226.  
  227. @Bean(name = "manageStagingScheduleMaster")
  228. public Job manageStagingScheduleMaster(final Step masterStep) throws Exception {
  229. return jobBuilderFactory.get("manageStagingScheduleMaster").preventRestart().incrementer(new RunIdIncrementer())
  230. .start(masterStep).build();
  231. @Bean(name = "manageContentWorkflowStaging")
  232. public Job manageContentWorkflowStaging(final Step CWSmasterStep) throws Exception {
  233. return jobBuilderFactory.get("manageContentWorkflowStaging").preventRestart()
  234. .incrementer(new RunIdIncrementer()).start(CWSmasterStep).build();
  235. }
  236. }`
  237.  
  238. rest:
  239. api1: http://localhost:8888/v1/daq/schedule?sourceId=7&endDate=11/01/2018&page=1&pageSize=2
  240. api2: https://api.myjson.com/bins/1gm3ef
  241. ContentWorkflow: http://localhost:8085/v1/cwdata/programcontentgap?&
  242. responseCount: 10000
  243. contentWorkflowType: ContentWorkflow
  244. batch.insert.schedule.query: 'insert into clouddataflow.STAGING_SCHEDULE_MASTER (SCHEDULE_ID,SOURCE_ID,SOURCE_NAME,AIRING_TYPE_ID,AIRING_TYPE_NAME,CREATED_DATE,CREATED_USER,MODIFIED_DATE,MODIFIED_USER)values(?,?,?,?,?,?,?,?,?)'
  245. batch.insert.contentworkflow.query: 'insert into clouddataflow.CONTENT_WORKFLOW_STAGING (CONTENT_WORKFLOW_STAGING_ID,PROGRAM_ID,MASTER_TITLE,ORIGINAL_TITLE,PROGRAM_TYPE_ID,PROGRAM_TYPE_NAME,SOURCE_GROUP_ID,SOURCE_GROUP_NAME,REGION,PROGRAM_LANGUAGE,CW_STATUS,COPY_CULTURE,GENRES,KEYWORDS,AIR_DATE_TIME,IS_PROCESSED,CREATED_USER,CREATED_DATE,MODIFIED_USER,MODIFIED_DATE)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
  246. modifiedUser: admin
  247. threadCount: 10
  248. spring.acherondb.url: 'jdbc:mysql://localhost:3306/clouddataflow'
  249. spring.acherondb.uname: uname
  250. spring.acherondb.pwd: pwd
  251. spring.acherondb.dclassName: com.mysql.jdbc.Driver
  252. spring.acherondb.schema: clouddataflow
  253. spring.acherondb.showSql: true
  254. spring.jpa.hibernate.naming.physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
  255. spring:
  256. batch:
  257. job:
  258. enabled: true
  259. datasource:
  260. type: com.zaxxer.hikari.HikariDataSource
  261. hikari:
  262. jdbc-url: jdbc:mysql://localhost:3306/clouddataflow
  263. username: uname
  264. password: pwd
  265. driver-class-name: com.mysql.jdbc.Driver
  266. connection-timeout: 300
  267. idle-timeout: 30
  268. minimum-idle: 10
  269. maximum-pool-size: 30
  270. logging:
  271. level:
  272. com.acheron: ERROR
  273. pattern:
  274. file: '%d{yyyy-MM-dd HH:mm:ss} {%thread} %-5level %logger{36} -%msg%n'
  275. file: application
  276. path: ./logs
  277. queueScheduleData:
  278. createUser: User1
  279. status: Success
  280. modifiedUser: User1
  281. status:
  282. success: Success
  283. completed: COMPLETED
Add Comment
Please, Sign In to add comment