Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // constructor
- const {Client} = require('pg');
- const colors = require('colors');
- const helpers = require('./helpers');
- const db = new Client({
- host: '127.0.0.1',
- port: 5432,
- user: 'postgres',
- password: 'votec1234',
- database: 'postgres',
- connectionString: 'postgres://postgres:votec1234@127.0.0.1:5432/postgres'
- });
- db.connect((err) => {
- if (err) {
- console.error('db:error', err.stack);
- } else {
- console.log('db:connected'.green);
- }
- });
- // constructor
- // mqtt event connect
- mqtt.on('connect', () => {
- mqtt.subscribe(['device','setting-update']);
- console.log('topic device listening...'.green);
- // interaval pruduct modunda kalkacak message eventinden yürüyecek.
- setInterval(() => {
- deviceTimeout();
- heartbeatTestPublish();
- },1000);
- });
- // mqtt event message
- mqtt.on('message', (topic, message) => {
- if(topic === 'device'){
- createUpdate(message);
- deviceTimeout(topic);
- }
- else if(topic === 'setting-update'){
- deviceTimeout(topic);
- }
- });
- // Algorithm Of Heartbeat aka Connection Time Out ...
- async function heartbeatTestPublish(){
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_01', device_name: 'device_name_01', status: 1, device_type: 'air_conditioner'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_02', device_name: 'device_name_02', status: 1, device_type: 'air_conditioner'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_03', device_name: 'device_name_03', status: 1, device_type: 'air_conditioner'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_04', device_name: 'device_name_04', status: 1, device_type: 'alarm_panel'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_05', device_name: 'device_name_05', status: 1, device_type: 'alarm_panel'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_06', device_name: 'device_name_06', status: 1, device_type: 'alarm_panel'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_07', device_name: 'device_name_07', status: 1, device_type: 'heat_sensor'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_08', device_name: 'device_name_08', status: 1, device_type: 'heat_sensor'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_09', device_name: 'device_name_09', status: 1, device_type: 'electricity_lights'}));
- mqtt.publish('device', JSON.stringify({device_instance: 'unique_device_20', device_name: 'device_name_10', status: 1, device_type: 'reactive_power_controller'}));
- }
- async function createUpdate(message){
- const parse = JSON.parse(message.toString());
- parse.published = [parse.device_instance, parse.device_name, parse.status, parse.device_type];
- const update = {};
- update.data = [parse.device_instance, parse.status];
- update.text = 'UPDATE public.devices SET status = $2, updated_at = CURRENT_TIMESTAMP WHERE device_instance = $1';
- update.query = await db.query(update.text, update.data);
- if(update.query.rowCount === 0){
- const insert = {};
- insert.text = 'INSERT INTO public.devices (device_instance, device_name, status, device_type) VALUES ($1,$2,$3,$4)';
- insert.query = await db.query(insert.text, parse.published);
- console.log('new device added'.green);
- } else {
- console.log('device info updated'.green);
- }
- }
- /*
- * Device modülünde bulunan settings tablosu sorguları minimum seviyede tutularak queryler azaltılmalı.
- * Settings Cacheleme: ayarlar bir değişkende tutulacak önbellekte tutulan ayarlar, modül start edildiğinde bir kez
- * ve settings tablosu güncellendiğinde x defa güncel hali alınacak.
- * Settings Tablosu güncellenirse settings-update topiği adı altında boş bir topic publish edilecek.
- * Publish edilen settings-update topiğine başvurularak önbellekteki veri güncellenerek run edilen modül içinde program
- * işlevine devam edilecek.
- * ESP kısmına başlanacak, OPI içerisindeki dependencyler kurulacak. Bi sonraki günü planına uyulacak.
- * */
- async function deviceTimeout(topic){
- // settings-update topiği yayınlanırsa settings verilerini güncelle değilse initten geleni kullan.
- if(topic === 'device'){
- console.log('deviceTimeOut() -> device');
- }
- else if(topic === 'setting-update'){
- console.log('deviceTimeOut() -> setting-update');
- }
- //////////////////////
- const settings = {};
- settings.text = 'SELECT device_type, heartbeat_frequency FROM public.settings ORDER BY id DESC';
- settings.query = await db.query(settings.text);
- settings.data = settings.query.rows;
- //////////////////////
- // settings tablosundaki her farklı device_type için
- for(const setting of settings.data){
- const timeouts = {};
- // eslint-disable-next-line quotes
- timeouts.text = "SELECT device_instance, device_type FROM devices WHERE status = 1 and device_type = '"+setting.device_type+"' and updated_at <= NOW() - INTERVAL '"+setting.heartbeat_frequency +" SECOND'";
- timeouts.query = await db.query(timeouts.text);
- if(timeouts.query.rowCount !== 0){
- for(const display of timeouts.query.rows){
- console.log((display.device_type+' - '+display.device_instance+' timeout...').red);
- }
- }
- // timeouta alınan her device için
- for(const timeout of timeouts.query.rows){
- const update = {};
- update.data = [timeout.device_instance, 0];
- update.text = 'UPDATE public.devices SET status = $2, updated_at = CURRENT_TIMESTAMP WHERE device_instance = $1';
- update.query = await db.query(update.text, update.data);
- }
- }
- }
Add Comment
Please, Sign In to add comment