Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """This module is responsible to send notifications when conditions are satisfied."""
- #!/usr/bin/python
- # -*- coding:utf-8 -*-
- import os
- import json
- from service.service import Service
- """
- Reactioner is responsible to store data from requests such as:
- - type: create or delete
- - username
- - monitor: { type: value } monitor condition to be analyzed
- - cherries: a list of cherries to collect data and analyze
- - notify_id: api id for the created notify
- with this data, reactioner monitors cherries and sends
- notification every time the monitor condition gets satified.
- To use Reactioner, just run uplug-reactioner in the shell.
- Reactioner reads some environment variables from config.json.
- Some common tasks, such as reading from a config file and creating
- a mqtt client, is made by Service module.
- """
- def __init__(self):
- self.root_path = os.path.dirname(os.path.realpath(__file__))
- super(Reactioner, self).__init__(app_path=self.root_path)
- command_topic = '$queue/user/+/{}'.format('status')
- self.subscribe(command_topic)
- self.client = MongoUtils(self.db_url, self.db_port, self.db_name)
- def __create_notify(self, message):
- '''if everithing is ok, create the notify from database.'''
- self.client.insert(message)
- def __delete_notify(self, message):
- '''if everithing is ok, delete the notify from database.'''
- self.client.remove(message)
- def __valid_notify(self, message):
- '''Verify if received message can be found on database as a
- notification request, needed fields:
- - username
- - monitor: { type: value } monitor condition to be analyzed
- - cherries: a list of cherries to collect data and analyze
- - notify_id: notify id from api.'''
- req_fields = ['username', 'monitor', 'cherries', 'notify_id']
- # must have all fields
- for item in req_fields:
- if item not in message:
- return False
- # cherries list cannot be empty or null
- cherries = message.get('cherries')
- if not isinstance(cherries, list) or not len(cherries):
- return False
- return True
- def __handle_notify(self, message):
- '''Compare message with stored notification request.
- This method should compare message received from cherry
- with the requests stored on mongo and send notification
- when necessary. Conditions to verify:
- - username has requested notifies.
- - cherry is inside the list of cherries to monitor.
- - monitor condition has been satified.
- if these three conditions are satisfied, the reactioner sends
- a notification to the user.'''
- notify_query = {
- 'username': message.get('username'),
- 'cherries': {'$in': [message.get('esp-id')]}
- }
- notifies = list(self.client.find(notify_query, {'monitor': 1, 'notify_id': 1}))
- print(notifies)
- if not notifies or not len(notifies):
- return
- for notify in notifies:
- data = {
- 'notify_id': notify.get('notify_id'),
- 'username': message.get('username'),
- 'cherry': message.get('esp-id')
- }
- for (monitor, value) in notify.get('monitor').items():
- if message.get(monitor) == value:
- if not data.get('monitor'): data['monitor'] = {}
- data['monitor'][monitor] = value
- if data['monitor']:
- self.send_notify(data)
- def status(self):
- '''Method that represents the topic of messages from cherry.
- the reactioner reads from the message, if the cherry is
- monitored, compares to the desired value and notifies.'''
- message = self.loads(self.message.payload)
- # compare with mongo notification comparator
- self.__handle_notify(message)
- def notify(self):
- '''Should save/remove a user request for notifications.'''
- message = self.loads(self.message.payload)
- data = message.get('data')
- if message.get('type') == 'create' and self.__valid_notify(data):
- self.__create_notify(data)
- elif message.get('type') == 'delete':
- self.__delete_notify(data)
- else:
- self.logger.info('Not a valid message: %s', message)
- def send_notify(self, data):
- '''Send notification to API.'''
- try:
- topic = 'user/{}/{}'.format(self.username, self.pub_topic)
- print(topic)
- self.publish('user/{}/{}'.format(self.username, self.pub_topic), json.dumps(data))
- except Exception:
- self.logger.error('data cant be parsed: %s', data)
- def main():
- """Main routine."""
- reactioner = Reactioner()
- reactioner.logger.info('Starting the reactioner from ' + reactioner.root_path)
- reactioner.loop_forever()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement