Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import json
- import sys
- from twisted.application import internet
- from twisted.application.service import Service
- from twisted.internet import reactor
- from twisted.internet.endpoints import TCP4ClientEndpoint
- from twisted.python.log import ILogObserver
- from twisted.web import resource, server
- from kademlia import log
- from mqtt.client.factory import MQTTFactory
- from mqtt import v311
- from conf import utils
- from security import Storage
- class CollectorService:
- """ Collector Service Definition.
- """
- def __init__(self, application, persistence, kserver, dStream, http_port, mqtt_ip, mqtt_port, topic):
- """ Create the Collector Service.
- Arguments:
- application: twisted application to bind the Service.
- kserver: Kademlia service server reference.
- port: TCP Port for listening HTTP Requests.
- """
- application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
- # MQTT Collector
- factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
- point = TCP4ClientEndpoint(reactor, mqtt_ip, mqtt_port)
- subscriber = Subscriber(persistence, kserver, dStream, 'dioti-subs', topic)
- d = point.connect(factory)
- d.addCallback(subscriber.gotProtocol)
- class Subscriber(Service):
- """ Service Class responsible for providing the MQTT methods for
- the communication with the DHT.
- """
- def __init__(self, persistence, kserver, dStream, subscriber, topic):
- """ Create the DHT handler for MQTT.
- Arguments:
- kserver: kademlia service reference.
- subscriber: subscriber identifier.
- topic: topic to subscribe.
- """
- self.protocol = None
- self.kserver = kserver
- self.persistence = persistence
- self.dStream = dStream
- self.subscriber = subscriber
- self.topic = topic
- def gotProtocol(self, p):
- """ Connect to MQTT Broker
- Arguments:
- p: protocol instance.
- """
- self.protocol = p
- d = p.connect(self.subscriber, keepalive=0, version=v311)
- d.addCallback(self.subscribe)
- def subscribe(self, *args):
- """ Subscribe a MQTT topic.
- """
- self.protocol.subscribe(self.topic)
- self.protocol.setPublishHandler(self.onPublish)
- def onPublish(self, topic, payload, qos, dup, retain, msgId):
- """ Event handler for publishing of subscribed events.
- Arguments:
- topic: message topic.
- payload: received data.
- """
- def cb_published(response):
- print "Set in Database Cluster (HTTP)."
- print response
- print ("ON PUBLISH")
- key = topic
- dataToSend = json.loads(str(payload.decode()))
- # Verify if data was previously sent to database and DHT
- if dataToSend['distribution'] is False:
- # Distribute data for Streaming
- dataToSend['distribution'] = True
- #self.dStream.publish(key, json.dumps(dataToSend))
- log.msg("Setting %s = %s" % (key, json.dumps(utils.byteify(dataToSend))))
- # DHT Publish Assertion
- self.kserver.set(key, json.dumps(dataToSend))
- # Persistence Publish
- d = self.persistence.db_handler.insert(key, utils.byteify(dataToSend))
- d.addCallback(cb_published)
- return server.NOT_DONE_YET
- else:
- print("IN DISTRIBUTION")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement