Advertisement
richieriviere

Spring Batch With Partitioner

Aug 3rd, 2016
479
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 10.91 KB | None | 0 0
  1.  
  2. @Configuration
  3. public class ExportMasterListCsvJobConfig {
  4.    
  5.     public static final String JOB_NAME = "exportMasterListCsv";
  6.  
  7.     @Autowired
  8.     public JobBuilderFactory jobBuilderFactory;
  9.  
  10.     @Autowired
  11.     public StepBuilderFactory stepBuilderFactory;
  12.  
  13.     /**
  14.      * Create object representing the master list file.
  15.      *
  16.      * @param outDir
  17.      * @param jobId
  18.      * @return
  19.      * @throws IOException
  20.      */
  21.     @Bean
  22.     @JobScope
  23.     public FileSystemResource masterListFile(
  24.             @Value("${out.dir}") String outDir,
  25.             @Value("#{jobExecution.id}") Long jobId
  26.             ) throws IOException {
  27.        
  28.         return new FileSystemResource(new File(outDir, "masterList" + jobId + ".csv"));
  29.     }
  30.    
  31.     /**
  32.      * Create job to export the master list from the online pricing staging db.
  33.      *
  34.      * @param readStgDbAndExportMasterListStep read online staging db step and export to csv
  35.      * @return job to export the master list from the online pricing staging db.
  36.      */
  37.     @Bean
  38.     public Job exportMasterListCsvJob(@Qualifier("partitionStep") Step partitionStep) {
  39.  
  40.         return jobBuilderFactory.get(JOB_NAME)
  41.                 .incrementer(new RunIdIncrementer())
  42.                 .flow(partitionStep)
  43.                 .end()
  44.                 .build();
  45.     }
  46.    
  47.     @Bean
  48.      public Step partitionStep(
  49.              @Qualifier("readStgDbAndExportMasterListStep") Step readStgDbAndExportMasterListStep,
  50.              @Qualifier("taskExecutor") TaskExecutor taskExecutor,
  51.              @Qualifier("partitioner") Partitioner partitioner){
  52.        
  53.         return stepBuilderFactory.get("partitionStep")
  54.            .partitioner(readStgDbAndExportMasterListStep)
  55.            .partitioner("readStgDbAndExportMasterListStep", partitioner)
  56.            .taskExecutor(taskExecutor)
  57.            .build();
  58.      }
  59.    
  60.     @Bean
  61.     @JobScope
  62.     public Partitioner partitioner(){
  63.         return new DivisionPartitioner();
  64.     }
  65.    
  66.     /**
  67.      * Read from online pricing staging db.
  68.      *
  69.      * @param chunkSize
  70.      * @param queryOnlineStagingDbReaderForMasterList
  71.      * @param masterListOutputProcessor
  72.      * @param masterListFileWriter
  73.      * @return step to read from online pricing staging db.
  74.      */
  75.     @Bean
  76.     @JobScope
  77.     public Step readStgDbAndExportMasterListStep(
  78.             @Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") int chunkSize,
  79.             @Qualifier("queryOnlineStagingDbReaderForMasterList") ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList,
  80.             @Qualifier("masterListOutputProcessor") ItemProcessor<MasterList,MasterList> masterListOutputProcessor,
  81.             @Qualifier("masterListFileWriter") ItemWriter<MasterList> masterListFileWriter) {
  82.  
  83.         return  stepBuilderFactory.get("readStgDbAndExportMasterListStep")
  84.                     .<MasterList,MasterList>chunk(chunkSize)
  85.                     .reader(queryOnlineStagingDbReaderForMasterList)
  86.                     .processor(masterListOutputProcessor)
  87.                     .writer(masterListFileWriter)
  88.                     .build();
  89.                    
  90.     }  
  91.      
  92.    
  93.     @Bean
  94.     @JobScope
  95.     public TaskExecutor taskExecutor() {
  96.       return new SimpleAsyncTaskExecutor();
  97.     }
  98.    
  99.     /**
  100.      * Query and map rows from online pricing staging db into a master list object.
  101.      *
  102.      * @param onlineStagingDb
  103.      * @param masterListSql
  104.      * @return
  105.      */
  106.     @Bean
  107.     @StepScope
  108.     public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList(
  109.             DataSource onlineStagingDb,
  110.             @Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") String masterListSql) {
  111.  
  112.         JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>();
  113.        
  114.         reader.setDataSource(onlineStagingDb);
  115.         reader.setSql(masterListSql);  
  116.        
  117.         reader.setPreparedStatementSetter(new PreparedStatementSetter() {
  118.             public void setValues(PreparedStatement ps) throws SQLException {
  119.                 ps.setInt(1, 45);
  120.             }
  121.         });
  122.         reader.setRowMapper(new RowMapper<MasterList>() {
  123.             @Override
  124.             public MasterList mapRow(ResultSet resultSet, int i) throws SQLException {
  125.                 MasterList masterList = new MasterList();
  126.                 masterList.setL1(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_ONE));
  127.                 masterList.setL2(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_TWO));
  128.                 masterList.setL2Name(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_TWO_NAME));
  129.                 masterList.setBrand(resultSet.getString(DaoConstants.COLUMN_HEADER_BRAND));
  130.                 masterList.setDivisionId(resultSet.getInt(DaoConstants.COLUMN_HEADER_DIVISION_ID));
  131.                 masterList.setSellingUnitRetail(resultSet.getDouble(DaoConstants.COLUMN_HEADER_SELLING_UNIT_RETAIL));
  132.                 masterList.setClearanceInd(resultSet.getBoolean(DaoConstants.COLUMN_HEADER_CLEARANCE_IND));
  133.                 masterList.setPromoRetail(resultSet.getDouble(DaoConstants.COLUMN_HEADER_PROMO_RETAIL));
  134.                 masterList.setPromoId(resultSet.getInt(DaoConstants.COLUMN_HEADER_PROMO_ID));
  135.                 masterList.setPromoName(resultSet.getString(DaoConstants.COLUMN_HEADER_PROMO_NAME));
  136.                 masterList.setPromoCompId(resultSet.getInt(DaoConstants.COLUMN_HEADER_PROMO_COMP_ID));
  137.                 masterList.setPromoCompName(resultSet.getString(DaoConstants.COLUMN_HEADER_PROMO_COMP_NAME));
  138.                 masterList.setRpmPromoCompDetailId(resultSet.getInt(DaoConstants.COLUMN_HEADER_RPM_PROMO_COMP_DETAIL_ID));
  139.                 masterList.setRpmPromoCompDetailState(resultSet.getString(DaoConstants.COLUMN_HEADER_RPM_PROMO_COMP_DETAIL_STATE));
  140.                
  141.                 Timestamp startDateTs = resultSet.getTimestamp(DaoConstants.COLUMN_HEADER_RPM_PROMO_COMP_DETAIL_START_DATE);
  142.                 java.util.Date startDate = null;
  143.                 if (startDateTs != null){
  144.                     startDate = new java.util.Date(startDateTs.getTime());
  145.                 }
  146.                 masterList.setRpmPromoCompDetailStartDate(startDate);
  147.  
  148.                 Timestamp endDateTs = resultSet.getTimestamp(DaoConstants.COLUMN_HEADER_RPM_PROMO_COMP_DETAIL_END_DATE);
  149.                 java.util.Date endDate = null;
  150.                 if (endDateTs != null){
  151.                     endDate = new java.util.Date(endDateTs.getTime());
  152.                 }
  153.                 masterList.setRpmPromoCompDetailEndDate(endDate);
  154.  
  155.                 return masterList;
  156.             }
  157.         });        
  158.         return reader;
  159.     }  
  160.    
  161.     /**
  162.      * Do some extra processing on master list rows.
  163.      *
  164.      * @return master list row with transformations applied.
  165.      */
  166.     @Bean
  167.     @JobScope
  168.     public ItemProcessor<MasterList,MasterList> masterListOutputProcessor() {
  169.  
  170.         return new ItemProcessor<MasterList, MasterList>() {
  171.            
  172.             @Override
  173.             public MasterList process(MasterList masterList) throws Exception {
  174.                 masterList.setPromoIdPromoCompId(masterList.getPromoId() + " / " + masterList.getPromoCompId());
  175.                 return masterList;
  176.             }
  177.         };
  178.     }
  179.    
  180.     /**
  181.      * Write out the the masterlist.csv
  182.      *
  183.      * @param masterListFile
  184.      * @param executionContext
  185.      * @return
  186.      */
  187.     @Bean
  188.     @StepScope
  189.     public ItemWriter<MasterList> masterListFileWriter(
  190.             FileSystemResource masterListFile,
  191.             @Value("#{stepExecutionContext}")Map<String, Object> executionContext) {
  192.        
  193.         FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
  194.         writer.setResource(masterListFile);
  195.         writer.setHeaderCallback(new FlatFileHeaderCallback() {
  196.  
  197.             public void writeHeader(Writer writer) throws IOException {
  198.                
  199.                 StringBuilder builder = new StringBuilder();
  200.  
  201.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_LEVEL_ONE + FlatFileConstants.OUTPUT_FILE_DELIM);
  202.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_LEVEL_TWO + FlatFileConstants.OUTPUT_FILE_DELIM);
  203.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_CLEARANCE + FlatFileConstants.OUTPUT_FILE_DELIM);
  204.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_WAS + FlatFileConstants.OUTPUT_FILE_DELIM);
  205.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_NOW + FlatFileConstants.OUTPUT_FILE_DELIM);
  206.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PERCENT_OFF + FlatFileConstants.OUTPUT_FILE_DELIM);
  207.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_BRAND + FlatFileConstants.OUTPUT_FILE_DELIM);
  208.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PRODUCT + FlatFileConstants.OUTPUT_FILE_DELIM);
  209.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_OFFER + FlatFileConstants.OUTPUT_FILE_DELIM);
  210.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_MARKETING_STATEMENT + FlatFileConstants.OUTPUT_FILE_DELIM);
  211.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_DINKUS + FlatFileConstants.OUTPUT_FILE_DELIM);
  212.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PROMO_ID + FlatFileConstants.OUTPUT_FILE_DELIM); // really promo comp id
  213.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PROMO_PROMO_COMPONENT_ID + FlatFileConstants.OUTPUT_FILE_DELIM);
  214.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PROMO_PROMO_START_DATE + FlatFileConstants.OUTPUT_FILE_DELIM);  
  215.                 builder.append(FlatFileConstants.MSTRLST_COLUMN_HEADER_PROMO_PROMO_END_DATE);  
  216.                 writer.write(builder.toString());
  217.             }
  218.         });
  219.        
  220.         FormatterLineAggregator<MasterList> lineAggregator = new FormatterLineAggregator<>();
  221.         lineAggregator.setFormat(
  222.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // l1
  223.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // l2
  224.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // clearance ind
  225.                 "%.2f" + FlatFileConstants.OUTPUT_FILE_DELIM +                  // was
  226.                 "%.2f" + FlatFileConstants.OUTPUT_FILE_DELIM +                  // now
  227.                 "%.0f" + FlatFileConstants.OUTPUT_FILE_DELIM +                  // percent off
  228.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // brand
  229.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // product name
  230.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // offer name
  231.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // marketing statement
  232.                 "%s" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // dinkus  
  233.                 "%d" + FlatFileConstants.OUTPUT_FILE_DELIM +                    // promo comp id
  234.                 "%s"+ FlatFileConstants.OUTPUT_FILE_DELIM +                     // promo promo comp id
  235.                 "%te/%<tm/%<tY %<tT" + FlatFileConstants.OUTPUT_FILE_DELIM +    // start date
  236.                 "%te/%<tm/%<tY %<tT");                                          // end date
  237.         writer.setLineAggregator(lineAggregator);
  238.        
  239.         BeanWrapperFieldExtractor<MasterList> extractor = new BeanWrapperFieldExtractor<MasterList>();
  240.         extractor.setNames(new String[] {
  241.                 "l1",
  242.                 "l2",
  243.                 "clearanceInd",
  244.                 "sellingUnitRetail",
  245.                 "promoRetail",
  246.                 "percentOff",
  247.                 "brand",
  248.                 "l2Name",
  249.                 "promoCompName",
  250.                 "marketingStatement",
  251.                 "dinkus",
  252.                 "promoCompId",
  253.                 "promoIdPromoCompId",
  254.                 "rpmPromoCompDetailStartDate",
  255.                 "rpmPromoCompDetailEndDate"});
  256.         lineAggregator.setFieldExtractor(extractor);
  257.        
  258.         writer.setForceSync(true);
  259.         writer.open(new ExecutionContext(executionContext));
  260.         return writer;
  261.  
  262.     }  
  263.    
  264. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement