Advertisement
Guest User

Untitled

a guest
May 11th, 2016
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.08 KB | None | 0 0
  1. @Primary
  2. @Bean(name = "mysqlDs")
  3. @ConfigurationProperties(prefix = "datasource.sql.jobMetaDataDb")
  4. public DataSource sqlDataSource() {
  5. return DataSourceBuilder.create().build();
  6. }
  7.  
  8. @Bean(name = "mysql")
  9. @Autowired
  10. public JdbcTemplate slaveJdbcTemplate(@Qualifier("mysqlDs") DataSource mysqlDs) {
  11. return new JdbcTemplate(mysqlDs);
  12. }
  13.  
  14.  
  15. @Bean(name = "rdsDataSource")
  16. @ConfigurationProperties(prefix = "datasource.sql.rdsWriterDb")
  17. public DataSource rdsDataSource() {
  18. return DataSourceBuilder.create().build();
  19. }
  20.  
  21. @Bean(name = "rdsJdbcTemplate")
  22. @Autowired
  23. public JdbcTemplate rdsJdbcTemplate(@Qualifier("rdsDataSource") DataSource rdsDataSource) {
  24. return new JdbcTemplate(rdsDataSource);
  25. }
  26.  
  27. #Mysql db end point confiugrations for
  28. #spring job metadata.
  29.  
  30. datasource.sql.jobMetaDataDb.url=jdbc:mysql://localhost/jobMetadata?characterEncoding=UTF-8
  31. datasource.sql.jobMetaDataDb.username=root
  32. datasource.sql.jobMetaDataDb.password=root
  33. datasource.sql.jobMetaDataDb.driverClassName=com.mysql.jdbc.Driver
  34. datasource.sql.jobMetaDataDb.jmx-enabled=true
  35. #Configuration to avoid wait_timeoutpackage com.fastretailing.catalogPlatformSCMProducer.producerjob.writer.rds;
  36.  
  37. import com.fastretailing.catalogPlatformSCMProducer.constants.ProducerJobConstants;
  38. import com.fastretailing.catalogPlatformSCMProducer.model.Configuration;
  39. import com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage;
  40. import com.fastretailing.catalogPlatformSCMProducer.model.RDSColumnInfo;
  41. import com.fastretailing.catalogPlatformSCMProducer.notification.JobStatus;
  42. import com.fastretailing.catalogPlatformSCMProducer.util.ProducerUtil;
  43. import com.fastretailing.catalogPlatformSCMProducer.util.QueryGenerator;
  44. import org.apache.commons.lang3.exception.ExceptionUtils;
  45. import org.slf4j.Logger;
  46. import org.slf4j.LoggerFactory;
  47. import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
  48. import org.springframework.beans.factory.annotation.Autowired;
  49. import org.springframework.beans.factory.annotation.Qualifier;
  50. import org.springframework.dao.EmptyResultDataAccessException;
  51. import org.springframework.jdbc.core.JdbcTemplate;
  52. import org.springframework.transaction.annotation.Transactional;
  53.  
  54. import java.sql.BatchUpdateException;
  55. import java.sql.Statement;
  56. import java.util.ArrayList;
  57. import java.util.List;
  58.  
  59. /**
  60. * Producer Job writer for writing directly to RDS.
  61. */
  62. public class RdsWriter extends AbstractItemStreamItemWriter<ProducerMessage> {
  63.  
  64. @Autowired
  65. @Qualifier("rdsJdbcTemplate")
  66. JdbcTemplate rdsJdbcTemplate;
  67.  
  68. @Autowired
  69. Configuration configuration;
  70.  
  71. QueryGenerator queryGenerator;
  72. private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
  73.  
  74. @Override
  75. public void write(List<? extends ProducerMessage> list) throws Exception {
  76.  
  77. handleRecord(list);
  78. }
  79.  
  80. @Transactional
  81. public void handleRecord(List<? extends ProducerMessage> list) {
  82. List<Object[]> objectList_insert = new ArrayList<>();
  83. List<Object> insertObject = null;
  84. String tableName = null;
  85.  
  86. for (ProducerMessage message : list) {
  87.  
  88. try {
  89. //Creating query generator once for job.
  90. insertObject = new ArrayList<>();
  91. if (null == queryGenerator) {
  92. queryGenerator = new QueryGenerator(message);
  93. }
  94. if (null == tableName)
  95. tableName = message.getTableName();
  96. String timestampValidationPS = ProducerUtil.generateTimestampCheckPS(message);
  97. Long returnValue;
  98. try {
  99. returnValue = rdsJdbcTemplate.queryForObject(timestampValidationPS,
  100. ProducerUtil.generatePrimaryKeyObjectList(message), Long.class);
  101. } catch (EmptyResultDataAccessException e) {
  102. LOGGER.debug("Primary key not exists in RDS table. This will insert new row");
  103. returnValue = null;
  104. }
  105.  
  106. if (null == returnValue || returnValue <= message.getTimeStamp()) {
  107.  
  108. for (RDSColumnInfo columnInfo : message.getRecord()) {
  109. //Adding null value insertion in case of non varchar type and empty value in CSV.
  110. if(columnInfo.getRdsColumnValue().isEmpty() && !columnInfo.getRdsVarType().equalsIgnoreCase(ProducerJobConstants.TYPE_VARCHAR)){
  111. insertObject.add(null);
  112. }else {
  113. insertObject.add(columnInfo.getRdsColumnValue());
  114. }
  115. }
  116. objectList_insert.add(insertObject.toArray());
  117.  
  118. } else {
  119. JobStatus.addRowsSkippedWriting(1);
  120. LOGGER.debug("Skipped row due to timestamp check failure for feedName {}",
  121. message.getFeedConfigName());
  122. }
  123.  
  124. } catch (Exception e) {
  125. JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
  126. JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
  127. JobStatus.addRowsSkippedWriting(1);
  128. LOGGER.error("Exception while processing records for RDS write. These records will be skipped from writing.",
  129. e);
  130. }
  131. }
  132.  
  133. try {
  134. if (objectList_insert != null) {
  135. String insertQuery = queryGenerator.generateRdsInsertPS(tableName);
  136. LOGGER.debug("Executing Query {}", insertQuery);
  137. rdsJdbcTemplate.batchUpdate(insertQuery, objectList_insert);
  138. JobStatus.addRowsWritten(objectList_insert.size() );
  139. }
  140.  
  141. } catch (Exception e) {
  142. //Handling batchUpdateException for rows written count.
  143. if(ExceptionUtils.indexOfThrowable(e,BatchUpdateException.class ) != -1){
  144. BatchUpdateException be = (BatchUpdateException) e.getCause();
  145. handleUpdateCountOnException(be);
  146. }
  147. JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
  148. JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
  149. LOGGER.error("Exception while writing records to RDS table. These records will be skipped from writing.",
  150. e);
  151. }
  152.  
  153. }
  154.  
  155. private void handleUpdateCountOnException(BatchUpdateException be){
  156. for (int count : be.getUpdateCounts()){
  157. if (count == Statement.EXECUTE_FAILED) {
  158. JobStatus.addRowsSkippedWriting(1);
  159. }else {
  160. JobStatus.addRowsWritten(1);
  161. }
  162. }
  163. }
  164. }
  165.  
  166. datasource.sql.jobMetaDataDb.testWhileIdle = true
  167. datasource.sql.jobMetaDataDb.timeBetweenEvictionRunsMillis = 7200000
  168. datasource.sql.jobMetaDataDb.validationQuery = SELECT 1
  169.  
  170. #Database configuration for RdsWriter.
  171. datasource.sql.rdsWriterDb.url=jdbc:mysql://localhost/rds?characterEncoding=UTF-8
  172. datasource.sql.rdsWriterDb.username=root
  173. datasource.sql.rdsWriterDb.password=root
  174. datasource.sql.rdsWriterDb.driverClassName=com.mysql.jdbc.Driver
  175. #Configuration to avoid wait_timeout
  176. datasource.sql.rdsWriterDb.testWhileIdle = true
  177. datasource.sql.rdsWriterDb.timeBetweenEvictionRunsMillis = 7200000
  178. datasource.sql.rdsWriterDb.validationQuery = SELECT 1
  179. datasource.sql.rdsWriterDb.jmx-enabled=true
  180. spring.datasource.jmx-enabled=true
  181.  
  182. package com.fastretailing.catalogPlatformSCMProducer.producerjob.writer.rds;
  183.  
  184. import com.fastretailing.catalogPlatformSCMProducer.constants.ProducerJobConstants;
  185. import com.fastretailing.catalogPlatformSCMProducer.model.Configuration;
  186. import com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage;
  187. import com.fastretailing.catalogPlatformSCMProducer.model.RDSColumnInfo;
  188. import com.fastretailing.catalogPlatformSCMProducer.notification.JobStatus;
  189. import com.fastretailing.catalogPlatformSCMProducer.util.ProducerUtil;
  190. import com.fastretailing.catalogPlatformSCMProducer.util.QueryGenerator;
  191. import org.apache.commons.lang3.exception.ExceptionUtils;
  192. import org.slf4j.Logger;
  193. import org.slf4j.LoggerFactory;
  194. import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
  195. import org.springframework.beans.factory.annotation.Autowired;
  196. import org.springframework.beans.factory.annotation.Qualifier;
  197. import org.springframework.dao.EmptyResultDataAccessException;
  198. import org.springframework.jdbc.core.JdbcTemplate;
  199. import org.springframework.transaction.annotation.Transactional;
  200.  
  201. import java.sql.BatchUpdateException;
  202. import java.sql.Statement;
  203. import java.util.ArrayList;
  204. import java.util.List;
  205.  
  206. /**
  207. * Producer Job writer for writing directly to RDS.
  208. */
  209. public class RdsWriter extends AbstractItemStreamItemWriter<ProducerMessage> {
  210.  
  211. @Autowired
  212. @Qualifier("rdsJdbcTemplate")
  213. JdbcTemplate rdsJdbcTemplate;
  214.  
  215. @Autowired
  216. Configuration configuration;
  217.  
  218. QueryGenerator queryGenerator;
  219. private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
  220.  
  221. @Override
  222. public void write(List<? extends ProducerMessage> list) throws Exception {
  223.  
  224. handleRecord(list);
  225. }
  226.  
  227. @Transactional
  228. public void handleRecord(List<? extends ProducerMessage> list) {
  229. List<Object[]> objectList_insert = new ArrayList<>();
  230. List<Object> insertObject = null;
  231. String tableName = null;
  232.  
  233. for (ProducerMessage message : list) {
  234.  
  235. try {
  236. //Creating query generator once for job.
  237. insertObject = new ArrayList<>();
  238. if (null == queryGenerator) {
  239. queryGenerator = new QueryGenerator(message);
  240. }
  241. if (null == tableName)
  242. tableName = message.getTableName();
  243. String timestampValidationPS = ProducerUtil.generateTimestampCheckPS(message);
  244. Long returnValue;
  245. try {
  246. returnValue = rdsJdbcTemplate.queryForObject(timestampValidationPS,
  247. ProducerUtil.generatePrimaryKeyObjectList(message), Long.class);
  248. } catch (EmptyResultDataAccessException e) {
  249. LOGGER.debug("Primary key not exists in RDS table. This will insert new row");
  250. returnValue = null;
  251. }
  252.  
  253. if (null == returnValue || returnValue <= message.getTimeStamp()) {
  254.  
  255. for (RDSColumnInfo columnInfo : message.getRecord()) {
  256. //Adding null value insertion in case of non varchar type and empty value in CSV.
  257. if(columnInfo.getRdsColumnValue().isEmpty() && !columnInfo.getRdsVarType().equalsIgnoreCase(ProducerJobConstants.TYPE_VARCHAR)){
  258. insertObject.add(null);
  259. }else {
  260. insertObject.add(columnInfo.getRdsColumnValue());
  261. }
  262. }
  263. objectList_insert.add(insertObject.toArray());
  264.  
  265. } else {
  266. JobStatus.addRowsSkippedWriting(1);
  267. LOGGER.debug("Skipped row due to timestamp check failure for feedName {}",
  268. message.getFeedConfigName());
  269. }
  270.  
  271. } catch (Exception e) {
  272. JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
  273. JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
  274. JobStatus.addRowsSkippedWriting(1);
  275. LOGGER.error("Exception while processing records for RDS write. These records will be skipped from writing.",
  276. e);
  277. }
  278. }
  279.  
  280. try {
  281. if (objectList_insert != null) {
  282. String insertQuery = queryGenerator.generateRdsInsertPS(tableName);
  283. LOGGER.debug("Executing Query {}", insertQuery);
  284. rdsJdbcTemplate.batchUpdate(insertQuery, objectList_insert);
  285. JobStatus.addRowsWritten(objectList_insert.size() );
  286. }
  287.  
  288. } catch (Exception e) {
  289. //Handling batchUpdateException for rows written count.
  290. if(ExceptionUtils.indexOfThrowable(e,BatchUpdateException.class ) != -1){
  291. BatchUpdateException be = (BatchUpdateException) e.getCause();
  292. handleUpdateCountOnException(be);
  293. }
  294. JobStatus.changeStatus(ProducerUtil.SNS_NOTIFICATION_EVENT_IN_COMPLETE);
  295. JobStatus.addExceptionInLogWriter(ExceptionUtils.getStackTrace(e));
  296. LOGGER.error("Exception while writing records to RDS table. These records will be skipped from writing.",
  297. e);
  298. }
  299.  
  300. }
  301.  
  302. private void handleUpdateCountOnException(BatchUpdateException be){
  303. for (int count : be.getUpdateCounts()){
  304. if (count == Statement.EXECUTE_FAILED) {
  305. JobStatus.addRowsSkippedWriting(1);
  306. }else {
  307. JobStatus.addRowsWritten(1);
  308. }
  309. }
  310. }
  311. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement