Advertisement
Guest User

Untitled

a guest
Dec 17th, 2016
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.84 KB | None | 0 0
  1. 'use strict'
  2.  
  3. const defaults = require('../defaults.js');
  4. const dyncol = require('dyncol'); // added 2016-12-17
  5. const events = require('events');
  6. const pckg = require('../package.json');
  7. const sequelize = require('sequelize'); // added 2016-12-17
  8. const util = require('util');
  9.  
  10. const sql = require('mariasql');
  11.  
  12. const dbGenerator = (options, self) => {
  13. let dbOptions = Object.assign({}, options.mariasql);
  14. delete dbOptions.db;
  15. let db = new sql(dbOptions);
  16. db.on('ready', () => {
  17. db.query('CREATE DATABASE IF NOT EXISTS ' + options.mariasql.db, (e, rows) => {
  18. if (e) return self.emit('error', e);
  19. db.query('use ' + options.mariasql.db, (e2, rows2) => {
  20. if (e2) return self.emit('error', e2);
  21. db.query(
  22. ('CREATE TABLE IF NOT EXISTS ' + options.table.name +
  23. ' (pk bigint AUTO_INCREMENT PRIMARY KEY, ds_key ' + options.table.keyType +
  24. ', ds_value ' + options.table.valueType +
  25. ', KEY ds_key (ds_key(32)), CONSTRAINT `ds_key_unique` UNIQUE(`ds_key`(32))) ENGINE=InnoDB DEFAULT CHARSET UTF8;'),
  26. (e3, rows3) => {
  27. if (e3) return self.emit('error', e3);
  28. db.query('SHOW TABLES', (e4, rows4) => {
  29. if (e4) return self.emit('error', e4);
  30. rows4.forEach((x, i, ar) => {
  31. self._tableList.push(x.Tables_in_deepstream);
  32. });
  33. });
  34. }
  35. );
  36. self.isReady = true;
  37. self.emit('ready');
  38. });
  39. });
  40. });
  41. db.on('error', (e) => {
  42. if (e.message.includes(`Unknown database \'${self._dbName}\'`)) {
  43. let cleanOptions = Object.assign({}, options);
  44. cleanOptions.mariasql = Object.assign({}, options.mariasql);
  45. cleanOptions.table = Object.assign({}, options.table);
  46. delete cleanOptions.mariasql.db;
  47. db = dbGenerator(cleanOptions, self);
  48. } else self.emit('error', e);
  49. });
  50. db.end();
  51. db.connect();
  52. return db;
  53. };
  54.  
  55. class Connector extends events.EventEmitter {
  56. constructor (options) {
  57. super();
  58. if (! (typeof options === 'object')) throw new TypeError('Incorrect connection options passed');
  59. this.isReady = false;
  60. this.name = pckg.name;
  61. this.version = pckg.version;
  62. options = Object.assign({}, defaults, options);
  63. options.mariasql = Object.assign({}, defaults.mariasql, options.mariasql);
  64. options.table = Object.assign({}, defaults.table, options.table);
  65. if (process.env.ds_host) options.mariasql.host = process.env.ds_host;
  66. if (process.env.ds_user) options.mariasql.user = process.env.ds_user;
  67. if (process.env.ds_password) options.mariasql.password = process.env.ds_password;
  68. if (process.env.ds_databaseName) options.mariasql.db = process.env.ds_databaseName;
  69. if (process.env.ds_tableName) options.table.name = process.env.ds_tableName;
  70. if (process.env.ds_keyType) options.table.keyType = process.env.ds_keyType;
  71. if (process.env.ds_valueType) options.table.valueType = process.env.ds_valueType;
  72. if (process.env.ds_splitter) options.splitter = process.env.ds_splitter;
  73. this.options = Object.assign({}, options);
  74. this._dbName = options.mariasql.db;
  75. this._table = options.table;
  76. this._splitter = options.splitter;
  77. this._tableList = [];
  78. this._db = dbGenerator(options, this);
  79. }
  80.  
  81. _upsert (tableName, key, value) {
  82. return new Promise((go, stop) => {
  83.  
  84. if ( value._v && value._d ) { // ignore the blank ones
  85.  
  86. var sqlInsertUpdate = 'INSERT INTO '
  87. + tableName
  88. + ' SET ds_key = ?'
  89. + ' , ds_value ='
  90. + dyncol.createQuery(value)
  91. + ' ON DUPLICATE KEY UPDATE '
  92. + ' ds_value ='
  93. + dyncol.updateQuery('ds_value', value)
  94. + ';'
  95. ;
  96. this._db.query(sqlInsertUpdate, [ key ], {metadata: true}, (e, rows) => {
  97. if (e) {
  98. console.log('update error! ' + e);
  99. return stop(e);
  100. } else {
  101. return go();
  102. }
  103. });
  104. } else {
  105. return go(); // ignore the blank ones
  106. }
  107. });
  108. }
  109.  
  110. set (key, value, callback) {
  111. var splitted = undefined;
  112. try {
  113. splitted = key.split(this._splitter);
  114. } catch (e) {
  115. return callback(e);
  116. };
  117. let tableName = (splitted.length > 1) ? splitted[0] : this.options.table.name;
  118. if (this._tableList.includes(tableName)) {
  119. this._upsert(tableName, key, value).then(() => {
  120. return callback(null);
  121. }).catch((e) => {
  122. return callback(e);
  123. });
  124. } else {
  125. this._db.query(
  126. ('CREATE TABLE IF NOT EXISTS ' + tableName +
  127. ' (pk bigint AUTO_INCREMENT PRIMARY KEY, ds_key ' + this._table.keyType +
  128. ', ds_value ' + this._table.valueType +
  129. ', KEY `ds_key` (`ds_key`(32)), CONSTRAINT ds_key_unique UNIQUE (`ds_key`(32)) ) ENGINE=InnoDB DEFAULT CHARSET UTF8;'),
  130. (e, rows) => {
  131. if (e) return this.emit('error', e);
  132. this._upsert(tableName, key, value).then(() => {
  133. return callback(null);
  134. }).catch((e) => {
  135. return cllback(e);
  136. });
  137. }
  138. );
  139. };
  140.  
  141. }
  142.  
  143. get (key, callback) {
  144. console.log('IN get ' );
  145.  
  146. var splitted = undefined;
  147. try {
  148. splitted = key.split(this._splitter);
  149. } catch (e) {
  150. console.log ('error on split! ' + e );
  151. return callback(e);
  152. };
  153. let tableName = (splitted.length > 1) ? splitted[0] : this._table.name;
  154. console.log('tableName: ' + tableName );
  155. let sqlSelect = 'SELECT COLUMN_JSON(ds_value) "ds_value" '
  156. + 'FROM '
  157. + tableName
  158. + ' WHERE 1'
  159. + ' AND ds_key = ?'
  160. ;
  161. console.log('sqlSelect: ' + sqlSelect);
  162. this._db.query(sqlSelect, [ key ], (e, rows) => {
  163. console.log('rows: ' + util.inspect(rows));
  164. if (e) {
  165. console.log('error on initial SELECT: ' + e );
  166. return callback(e);
  167. }
  168. if ( rows[0] ) {
  169. var sequelizedJson = sequelize.col(rows[0].ds_value);
  170. //return callback(null, JSON.parse('{"_v":7,"_d":{"atata":"fake fake!"}}'));
  171. return callback(null, JSON.parse(sequelizedJson.col));
  172. } else {
  173. return callback(null, null);
  174. }
  175. });
  176. }
  177.  
  178. delete (key, callback) {
  179. var splitted = undefined;
  180. try {
  181. splitted = key.split(this._splitter);
  182. } catch (e) {
  183. return callback(e);
  184. };
  185. let tableName = (splitted.length > 1) ? splitted[0] : this._table.name;
  186. this._db.query('delete from ' + tableName + ' where ds_key = ?', [ key ], (e, rows) => {
  187. if (e) callback(e);
  188. return callback(null);
  189. });
  190. }
  191. }
  192.  
  193. module.exports = Connector
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement