Advertisement
Guest User

Untitled

a guest
Nov 17th, 2016
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.93 KB | None | 0 0
  1. // @flow
  2. // @author tyler
  3.  
  4. import mysql from 'mysql';
  5.  
  6. import type {IConfigReader} from '../ConfigReader';
  7.  
  8. export type TDataRow = *;
  9.  
  10. export interface ITransaction {
  11. query (query: string): Promise<TDataRow>;
  12. }
  13.  
  14. export interface IDataClient {
  15. /**
  16. * Query the data store for a specific result row
  17. * @param query {string}
  18. * @returns {Promise<TDataRow>}
  19. */
  20. query (query: string): Promise<TDataRow>;
  21.  
  22. /**
  23. * Provides an interface for transactions
  24. * @summary Automatically commits or rollback based on
  25. * included promise result. E.G. transaction(t => return t.query('blah'))
  26. * @param fn
  27. * @returns {Promise<*>}
  28. */
  29. transaction (fn: (t: ITransaction) => Promise<*>): Promise<*>;
  30. }
  31.  
  32. export class MySqlDataClient {
  33. _pool: *;
  34.  
  35. constructor (configReader: IConfigReader) {
  36. const {db} = configReader.get();
  37.  
  38. this._pool = mysql.createPool({
  39. connectionLimit: db.poolSize,
  40. host: db.host,
  41. port: db.port,
  42. database: db.database,
  43. password: db.password,
  44. user: db.user
  45. });
  46. }
  47.  
  48. query (query: string): Promise<TDataRow> {
  49. return new Promise((resolve, reject) => {
  50. // get a connection
  51. this._pool.getConnection((err, connection) => {
  52. if (err) {
  53. reject(err);
  54. return;
  55. }
  56.  
  57. connection.query(query, (err, rows) => {
  58. connection.release();
  59.  
  60. if (err) {
  61. reject(err);
  62. return;
  63. }
  64.  
  65. resolve(rows);
  66. });
  67. });
  68. });
  69. }
  70.  
  71. /**
  72. * Provides a transaction interface
  73. * @param fn
  74. * @returns {Promise<*>}
  75. */
  76. transaction (fn: (t: ITransaction) => Promise<*>): Promise<*> {
  77. return new Promise((resolve, reject) => {
  78. // get a connection
  79. this._pool.getConnection((err, connection) => {
  80. if (err) {
  81. reject(err);
  82. return;
  83. }
  84.  
  85. // begin transaction
  86. connection.beginTransaction(err => {
  87. if (err) {
  88. reject(err);
  89. return;
  90. }
  91.  
  92. // create transaction interface
  93. const transaction: ITransaction = {
  94. query (query: string): Promise<TDataRow> {
  95. return new Promise((resolve, reject) => {
  96. connection.query(query, (err, rows) => {
  97. if (err) {
  98. reject(err);
  99. return;
  100. }
  101.  
  102. resolve(rows);
  103. });
  104. });
  105. }
  106. };
  107.  
  108. // pass interface to caller
  109. fn(transaction)
  110. .then(result => {
  111. // commit transaction
  112. connection.commit(err => {
  113. if (err) {
  114. throw err;
  115. }
  116.  
  117. resolve(result);
  118. });
  119. })
  120. .catch(err => {
  121. // rollback transaction
  122. connection.rollback(() => {
  123. reject(err);
  124. });
  125. });
  126. });
  127. });
  128. });
  129. }
  130. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement