Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Primary
- @Bean(name = "mysqlDs")
- @ConfigurationProperties(prefix = "datasource.sql.jobMetaDataDb")
- public DataSource sqlDataSource() {
- return DataSourceBuilder.create().build();
- }
- @Bean(name = "mysql")
- @Autowired
- public JdbcTemplate slaveJdbcTemplate(@Qualifier("mysqlDs") DataSource mysqlDs) {
- return new JdbcTemplate(mysqlDs);
- }
- @Bean(name = "rdsDataSource")
- @ConfigurationProperties(prefix = "datasource.sql.rdsWriterDb")
- public DataSource rdsDataSource() {
- return DataSourceBuilder.create().build();
- }
- @Bean(name = "rdsJdbcTemplate")
- @Autowired
- public JdbcTemplate rdsJdbcTemplate(@Qualifier("rdsDataSource") DataSource rdsDataSource) {
- return new JdbcTemplate(rdsDataSource);
- }
- #Mysql db end point confiugrations for
- #spring job metadata.
- datasource.sql.jobMetaDataDb.url=jdbc:mysql://localhost/jobMetadata?characterEncoding=UTF-8
- datasource.sql.jobMetaDataDb.username=root
- datasource.sql.jobMetaDataDb.password=root
- datasource.sql.jobMetaDataDb.driverClassName=com.mysql.jdbc.Driver
- datasource.sql.jobMetaDataDb.jmx-enabled=true
- #Configuration to avoid wait_timeoutpackage com.fastretailing.catalogPlatformSCMProducer.producerjob.writer.rds;
- import com.fastretailing.catalogPlatformSCMProducer.constants.ProducerJobConstants;
- import com.fastretailing.catalogPlatformSCMProducer.model.Configuration;
- import com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage;
- import com.fastretailing.catalogPlatformSCMProducer.model.RDSColumnInfo;
- import com.fastretailing.catalogPlatformSCMProducer.notification.JobStatus;
- import com.fastretailing.catalogPlatformSCMProducer.util.ProducerUtil;
- import com.fastretailing.catalogPlatformSCMProducer.util.QueryGenerator;
- import org.apache.commons.lang3.exception.ExceptionUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.dao.EmptyResultDataAccessException;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.transaction.annotation.Transactional;
- import java.sql.BatchUpdateException;
- import java.sql.Statement;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * Producer Job writer for writing directly to RDS.
- */
- public class RdsWriter extends AbstractItemStreamItemWriter<ProducerMessage> {
- @Autowired
- @Qualifier("rdsJdbcTemplate")
- JdbcTemplate rdsJdbcTemplate;
- @Autowired
- Configuration configuration;
- QueryGenerator queryGenerator;
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
- @Override
- public void write(List<? extends ProducerMessage> list) throws Exception {
- handleRecord(list);
- }
- @Transactional
- public void handleRecord(List<? extends ProducerMessage> list) {
- List<Object[]> objectList_insert = new ArrayList<>();
- List<Object> insertObject = null;
- String tableName = null;
- for (ProducerMessage message : list) {
- try {
- //Creating query generator once for job.
- insertObject = new ArrayList<>();
- if (null == queryGenerator) {
- queryGenerator = new QueryGenerator(message);
- }
- if (null == tableName)
- tableName = message.getTableName();
- String timestampValidationPS = ProducerUtil.generateTimestampCheckPS(message);
- Long returnValue;
- try {
- returnValue = rdsJdbcTemplate.queryForObject(timestampValidationPS,
- ProducerUtil.generatePrimaryKeyObjectList(message), Long.class);
- } catch (EmptyResultDataAccessException e) {
- LOGGER.debug("Primary key not exists in RDS table. This will insert new row");
- returnValue = null;
- }
- if (null == returnValue || returnValue <= message.getTimeStamp()) {
- for (RDSColumnInfo columnInfo : message.getRecord()) {
- //Adding null value insertion in case of non varchar type and empty value in CSV.
- if(columnInfo.getRdsColumnValue().isEmpty() && !columnInfo.getRdsVarType().equalsIgnoreCase(ProducerJobConstants.TYPE_VARCHAR)){
- insertObject.add(null);
- }else {
- insertObject.add(columnInfo.getRdsColumnValue());
- }
- }
- objectList_insert.add(insertObject.toArray());
- } else {
- JobStatus.addRowsSkippedWriting(1);
- LOGGER.debug("Skipped row due to timestamp check failure for feedName {}",
- message.getFeedConfigName());
- }
- } catch (Exception e) {
- JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
- JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
- JobStatus.addRowsSkippedWriting(1);
- LOGGER.error("Exception while processing records for RDS write. These records will be skipped from writing.",
- e);
- }
- }
- try {
- if (objectList_insert != null) {
- String insertQuery = queryGenerator.generateRdsInsertPS(tableName);
- LOGGER.debug("Executing Query {}", insertQuery);
- rdsJdbcTemplate.batchUpdate(insertQuery, objectList_insert);
- JobStatus.addRowsWritten(objectList_insert.size() );
- }
- } catch (Exception e) {
- //Handling batchUpdateException for rows written count.
- if(ExceptionUtils.indexOfThrowable(e,BatchUpdateException.class ) != -1){
- BatchUpdateException be = (BatchUpdateException) e.getCause();
- handleUpdateCountOnException(be);
- }
- JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
- JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
- LOGGER.error("Exception while writing records to RDS table. These records will be skipped from writing.",
- e);
- }
- }
- private void handleUpdateCountOnException(BatchUpdateException be){
- for (int count : be.getUpdateCounts()){
- if (count == Statement.EXECUTE_FAILED) {
- JobStatus.addRowsSkippedWriting(1);
- }else {
- JobStatus.addRowsWritten(1);
- }
- }
- }
- }
- datasource.sql.jobMetaDataDb.testWhileIdle = true
- datasource.sql.jobMetaDataDb.timeBetweenEvictionRunsMillis = 7200000
- datasource.sql.jobMetaDataDb.validationQuery = SELECT 1
- #Database configuration for RdsWriter.
- datasource.sql.rdsWriterDb.url=jdbc:mysql://localhost/rds?characterEncoding=UTF-8
- datasource.sql.rdsWriterDb.username=root
- datasource.sql.rdsWriterDb.password=root
- datasource.sql.rdsWriterDb.driverClassName=com.mysql.jdbc.Driver
- #Configuration to avoid wait_timeout
- datasource.sql.rdsWriterDb.testWhileIdle = true
- datasource.sql.rdsWriterDb.timeBetweenEvictionRunsMillis = 7200000
- datasource.sql.rdsWriterDb.validationQuery = SELECT 1
- datasource.sql.rdsWriterDb.jmx-enabled=true
- spring.datasource.jmx-enabled=true
- package com.fastretailing.catalogPlatformSCMProducer.producerjob.writer.rds;
- import com.fastretailing.catalogPlatformSCMProducer.constants.ProducerJobConstants;
- import com.fastretailing.catalogPlatformSCMProducer.model.Configuration;
- import com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage;
- import com.fastretailing.catalogPlatformSCMProducer.model.RDSColumnInfo;
- import com.fastretailing.catalogPlatformSCMProducer.notification.JobStatus;
- import com.fastretailing.catalogPlatformSCMProducer.util.ProducerUtil;
- import com.fastretailing.catalogPlatformSCMProducer.util.QueryGenerator;
- import org.apache.commons.lang3.exception.ExceptionUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.dao.EmptyResultDataAccessException;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.transaction.annotation.Transactional;
- import java.sql.BatchUpdateException;
- import java.sql.Statement;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * Producer Job writer for writing directly to RDS.
- */
- public class RdsWriter extends AbstractItemStreamItemWriter<ProducerMessage> {
- @Autowired
- @Qualifier("rdsJdbcTemplate")
- JdbcTemplate rdsJdbcTemplate;
- @Autowired
- Configuration configuration;
- QueryGenerator queryGenerator;
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
- @Override
- public void write(List<? extends ProducerMessage> list) throws Exception {
- handleRecord(list);
- }
- @Transactional
- public void handleRecord(List<? extends ProducerMessage> list) {
- List<Object[]> objectList_insert = new ArrayList<>();
- List<Object> insertObject = null;
- String tableName = null;
- for (ProducerMessage message : list) {
- try {
- //Creating query generator once for job.
- insertObject = new ArrayList<>();
- if (null == queryGenerator) {
- queryGenerator = new QueryGenerator(message);
- }
- if (null == tableName)
- tableName = message.getTableName();
- String timestampValidationPS = ProducerUtil.generateTimestampCheckPS(message);
- Long returnValue;
- try {
- returnValue = rdsJdbcTemplate.queryForObject(timestampValidationPS,
- ProducerUtil.generatePrimaryKeyObjectList(message), Long.class);
- } catch (EmptyResultDataAccessException e) {
- LOGGER.debug("Primary key not exists in RDS table. This will insert new row");
- returnValue = null;
- }
- if (null == returnValue || returnValue <= message.getTimeStamp()) {
- for (RDSColumnInfo columnInfo : message.getRecord()) {
- //Adding null value insertion in case of non varchar type and empty value in CSV.
- if(columnInfo.getRdsColumnValue().isEmpty() && !columnInfo.getRdsVarType().equalsIgnoreCase(ProducerJobConstants.TYPE_VARCHAR)){
- insertObject.add(null);
- }else {
- insertObject.add(columnInfo.getRdsColumnValue());
- }
- }
- objectList_insert.add(insertObject.toArray());
- } else {
- JobStatus.addRowsSkippedWriting(1);
- LOGGER.debug("Skipped row due to timestamp check failure for feedName {}",
- message.getFeedConfigName());
- }
- } catch (Exception e) {
- JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
- JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
- JobStatus.addRowsSkippedWriting(1);
- LOGGER.error("Exception while processing records for RDS write. These records will be skipped from writing.",
- e);
- }
- }
- try {
- if (objectList_insert != null) {
- String insertQuery = queryGenerator.generateRdsInsertPS(tableName);
- LOGGER.debug("Executing Query {}", insertQuery);
- rdsJdbcTemplate.batchUpdate(insertQuery, objectList_insert);
- JobStatus.addRowsWritten(objectList_insert.size() );
- }
- } catch (Exception e) {
- //Handling batchUpdateException for rows written count.
- if(ExceptionUtils.indexOfThrowable(e,BatchUpdateException.class ) != -1){
- BatchUpdateException be = (BatchUpdateException) e.getCause();
- handleUpdateCountOnException(be);
- }
- JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
- JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
- LOGGER.error("Exception while writing records to RDS table. These records will be skipped from writing.",
- e);
- }
- }
- private void handleUpdateCountOnException(BatchUpdateException be){
- for (int count : be.getUpdateCounts()){
- if (count == Statement.EXECUTE_FAILED) {
- JobStatus.addRowsSkippedWriting(1);
- }else {
- JobStatus.addRowsWritten(1);
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement