Advertisement
Guest User

Untitled

a guest
May 31st, 2016
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.55 KB | None | 0 0
  1. import base64
  2. import json
  3. import sys
  4.  
  5. from twisted.application import internet
  6. from twisted.application.service import Service
  7. from twisted.internet import reactor
  8. from twisted.internet.endpoints import TCP4ClientEndpoint
  9. from twisted.python.log import ILogObserver
  10. from twisted.web import resource, server
  11.  
  12. from kademlia import log
  13. from mqtt.client.factory import MQTTFactory
  14. from mqtt import v311
  15.  
  16. from conf import utils
  17. from security import Storage
  18.  
  19.  
  20. class CollectorService:
  21. """ Collector Service Definition.
  22. """
  23.  
  24. def __init__(self, application, persistence, kserver, dStream, http_port, mqtt_ip, mqtt_port, topic):
  25. """ Create the Collector Service.
  26.  
  27. Arguments:
  28. application: twisted application to bind the Service.
  29. kserver: Kademlia service server reference.
  30. port: TCP Port for listening HTTP Requests.
  31. """
  32. application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
  33.  
  34. # MQTT Collector
  35. factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
  36. point = TCP4ClientEndpoint(reactor, mqtt_ip, mqtt_port)
  37. subscriber = Subscriber(persistence, kserver, dStream, 'dioti-subs', topic)
  38. d = point.connect(factory)
  39. d.addCallback(subscriber.gotProtocol)
  40.  
  41.  
  42. class Subscriber(Service):
  43. """ Service Class responsible for providing the MQTT methods for
  44. the communication with the DHT.
  45. """
  46.  
  47. def __init__(self, persistence, kserver, dStream, subscriber, topic):
  48. """ Create the DHT handler for MQTT.
  49.  
  50. Arguments:
  51. kserver: kademlia service reference.
  52. subscriber: subscriber identifier.
  53. topic: topic to subscribe.
  54. """
  55. self.protocol = None
  56. self.kserver = kserver
  57. self.persistence = persistence
  58. self.dStream = dStream
  59. self.subscriber = subscriber
  60. self.topic = topic
  61.  
  62. def gotProtocol(self, p):
  63. """ Connect to MQTT Broker
  64.  
  65. Arguments:
  66. p: protocol instance.
  67. """
  68. self.protocol = p
  69. d = p.connect(self.subscriber, keepalive=0, version=v311)
  70. d.addCallback(self.subscribe)
  71.  
  72. def subscribe(self, *args):
  73. """ Subscribe a MQTT topic.
  74. """
  75. self.protocol.subscribe(self.topic)
  76. self.protocol.setPublishHandler(self.onPublish)
  77.  
  78. def onPublish(self, topic, payload, qos, dup, retain, msgId):
  79. """ Event handler for publishing of subscribed events.
  80.  
  81. Arguments:
  82. topic: message topic.
  83. payload: received data.
  84. """
  85. def cb_published(response):
  86. print "Set in Database Cluster (HTTP)."
  87. print response
  88.  
  89. print ("ON PUBLISH")
  90. key = topic
  91. dataToSend = json.loads(str(payload.decode()))
  92. # Verify if data was previously sent to database and DHT
  93. if dataToSend['distribution'] is False:
  94. # Distribute data for Streaming
  95. dataToSend['distribution'] = True
  96. #self.dStream.publish(key, json.dumps(dataToSend))
  97. log.msg("Setting %s = %s" % (key, json.dumps(utils.byteify(dataToSend))))
  98. # DHT Publish Assertion
  99. self.kserver.set(key, json.dumps(dataToSend))
  100. # Persistence Publish
  101. d = self.persistence.db_handler.insert(key, utils.byteify(dataToSend))
  102. d.addCallback(cb_published)
  103. return server.NOT_DONE_YET
  104. else:
  105. print("IN DISTRIBUTION")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement