Guest User

Untitled

a guest
Jul 10th, 2018
196
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.77 KB | None | 0 0
  1. // constructor
  2.  
  3. const {Client} = require('pg');
  4. const colors = require('colors');
  5. const helpers = require('./helpers');
  6.  
  7. const db = new Client({
  8. host: '127.0.0.1',
  9. port: 5432,
  10. user: 'postgres',
  11. password: 'votec1234',
  12. database: 'postgres',
  13. connectionString: 'postgres://postgres:votec1234@127.0.0.1:5432/postgres'
  14. });
  15.  
  16. db.connect((err) => {
  17. if (err) {
  18. console.error('db:error', err.stack);
  19. } else {
  20. console.log('db:connected'.green);
  21. }
  22. });
  23. // constructor
  24.  
  25. // mqtt event connect
  26. mqtt.on('connect', () => {
  27. mqtt.subscribe(['device','setting-update']);
  28. console.log('topic device listening...'.green);
  29.  
  30. // interaval pruduct modunda kalkacak message eventinden yürüyecek.
  31. setInterval(() => {
  32. deviceTimeout();
  33. heartbeatTestPublish();
  34. },1000);
  35.  
  36. });
  37.  
  38. // mqtt event message
  39. mqtt.on('message', (topic, message) => {
  40. if(topic === 'device'){
  41. createUpdate(message);
  42. deviceTimeout(topic);
  43. }
  44. else if(topic === 'setting-update'){
  45. deviceTimeout(topic);
  46. }
  47. });
  48.  
  49. // Algorithm Of Heartbeat aka Connection Time Out ...
  50. async function heartbeatTestPublish(){
  51. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_01', device_name: 'device_name_01', status: 1, device_type: 'air_conditioner'}));
  52. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_02', device_name: 'device_name_02', status: 1, device_type: 'air_conditioner'}));
  53. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_03', device_name: 'device_name_03', status: 1, device_type: 'air_conditioner'}));
  54. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_04', device_name: 'device_name_04', status: 1, device_type: 'alarm_panel'}));
  55. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_05', device_name: 'device_name_05', status: 1, device_type: 'alarm_panel'}));
  56. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_06', device_name: 'device_name_06', status: 1, device_type: 'alarm_panel'}));
  57. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_07', device_name: 'device_name_07', status: 1, device_type: 'heat_sensor'}));
  58. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_08', device_name: 'device_name_08', status: 1, device_type: 'heat_sensor'}));
  59. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_09', device_name: 'device_name_09', status: 1, device_type: 'electricity_lights'}));
  60. mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_20', device_name: 'device_name_10', status: 1, device_type: 'reactive_power_controller'}));
  61. }
  62.  
  63. async function createUpdate(message){
  64. const parse = JSON.parse(message.toString());
  65. parse.published = [parse.device_instance, parse.device_name, parse.status, parse.device_type];
  66. const update = {};
  67.  
  68. update.data = [parse.device_instance, parse.status];
  69. update.text = 'UPDATE public.devices SET status = $2, updated_at = CURRENT_TIMESTAMP WHERE device_instance = $1';
  70. update.query = await db.query(update.text, update.data);
  71.  
  72. if(update.query.rowCount === 0){
  73. const insert = {};
  74. insert.text = 'INSERT INTO public.devices (device_instance, device_name, status, device_type) VALUES ($1,$2,$3,$4)';
  75. insert.query = await db.query(insert.text, parse.published);
  76. console.log('new device added'.green);
  77. } else {
  78. console.log('device info updated'.green);
  79. }
  80. }
  81.  
  82. /*
  83. * Device modülünde bulunan settings tablosu sorguları minimum seviyede tutularak queryler azaltılmalı.
  84. * Settings Cacheleme: ayarlar bir değişkende tutulacak önbellekte tutulan ayarlar, modül start edildiğinde bir kez
  85. * ve settings tablosu güncellendiğinde x defa güncel hali alınacak.
  86. * Settings Tablosu güncellenirse settings-update topiği adı altında boş bir topic publish edilecek.
  87. * Publish edilen settings-update topiğine başvurularak önbellekteki veri güncellenerek run edilen modül içinde program
  88. * işlevine devam edilecek.
  89. * ESP kısmına başlanacak, OPI içerisindeki dependencyler kurulacak. Bi sonraki günü planına uyulacak.
  90. * */
  91. async function deviceTimeout(topic){
  92. // settings-update topiği yayınlanırsa settings verilerini güncelle değilse initten geleni kullan.
  93.  
  94. if(topic === 'device'){
  95. console.log('deviceTimeOut() -> device');
  96.  
  97. }
  98.  
  99. else if(topic === 'setting-update'){
  100. console.log('deviceTimeOut() -> setting-update');
  101.  
  102. }
  103.  
  104. //////////////////////
  105. const settings = {};
  106. settings.text = 'SELECT device_type, heartbeat_frequency FROM public.settings ORDER BY id DESC';
  107. settings.query = await db.query(settings.text);
  108. settings.data = settings.query.rows;
  109. //////////////////////
  110.  
  111. // settings tablosundaki her farklı device_type için
  112. for(const setting of settings.data){
  113. const timeouts = {};
  114. // eslint-disable-next-line quotes
  115. timeouts.text = "SELECT device_instance, device_type FROM devices WHERE status = 1 and device_type = '"+setting.device_type+"' and updated_at <= NOW() - INTERVAL '"+setting.heartbeat_frequency +" SECOND'";
  116. timeouts.query = await db.query(timeouts.text);
  117. if(timeouts.query.rowCount !== 0){
  118. for(const display of timeouts.query.rows){
  119. console.log((display.device_type+' - '+display.device_instance+' timeout...').red);
  120. }
  121. }
  122.  
  123. // timeouta alınan her device için
  124. for(const timeout of timeouts.query.rows){
  125. const update = {};
  126. update.data = [timeout.device_instance, 0];
  127. update.text = 'UPDATE public.devices SET status = $2, updated_at = CURRENT_TIMESTAMP WHERE device_instance = $1';
  128. update.query = await db.query(update.text, update.data);
  129. }
  130. }
  131. }
Add Comment
Please, Sign In to add comment