Guest User

Untitled

a guest
May 31st, 2016
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.24 KB | None | 0 0
  1. import base64
  2. import json
  3. from StringIO import StringIO
  4.  
  5. from twisted.application import internet
  6. from twisted.application.service import Service
  7. from twisted.web import resource, server
  8. from twisted.internet import reactor
  9. from twisted.internet.endpoints import TCP4ClientEndpoint
  10. from twisted.web.client import Agent, FileBodyProducer, readBody
  11. from twisted.web.http_headers import Headers
  12. from services.txbonjour import discovery
  13.  
  14. from mqtt.client.factory import MQTTFactory
  15. from mqtt import v311
  16.  
  17. from conf import utils
  18. from security import Storage
  19.  
  20.  
  21. class DistributeStreamService:
  22.  
  23. def __init__(self, ip='', port=''):
  24.  
  25. self.ip = ip
  26. self.port = port
  27. self.streamers = []
  28. self.streamerPoints = {}
  29. # Start mDNS
  30. proto = DiscoverProtocol(self)
  31. dservice = discovery.listenBonjour(proto, '_diotiStream._tcp')
  32. dservice.startService()
  33.  
  34. def publish(self, topic, data):
  35. print "Start Streaming1"
  36. for ip, streamer in self.streamerPoints.iteritems():
  37. print "Start Streaming3"
  38. print topic
  39. print streamer
  40. #streamer.publish(topic, data)
  41.  
  42. class Publisher(Service):
  43.  
  44. def __init__(self, publ_name):
  45. self.protocol = None
  46. self.publ_name = publ_name
  47.  
  48. def gotProtocol(self, p):
  49. self.protocol = p
  50. print ("connecting")
  51. return p.connect(self.publ_name, keepalive=0, version=v311)
  52.  
  53. def publish(self, topic, data):
  54. d = self.protocol.publish(topic=topic, message=data)
  55. d.addErrback(self.printError)
  56.  
  57. def printError(self, *args):
  58. print(args)
  59.  
  60.  
  61. class DiscoverProtocol(discovery.BroadcastProtocol):
  62.  
  63. def __init__(self, ds):
  64. self.ds = ds
  65. self.tempList = []
  66.  
  67. def registerReceived(self, *args):
  68. print "now broadcasting"
  69.  
  70. def addService(self, *args):
  71.  
  72. def connected(params):
  73. print "connected"
  74. self.ds.streamerPoints[args[0].name] = publisher
  75. self.tempList = filter(lambda x: x != args[0].name, self.tempList)
  76.  
  77. if args[0].name not in self.ds.streamerPoints and args[0].name not in self.tempList:
  78. self.tempList.append(args[0].name)
  79. print (self.tempList)
  80. #self.ds.streamers.append(args[0].name)
  81.  
  82. if args[0].name != 'http://' + self.ds.ip + ":" + str(self.ds.port):
  83. dest_port = int(args[0].name.split(':')[2].encode())
  84. dest_ip = args[0].name.split(':')[1].replace('/', '').encode()
  85.  
  86. factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
  87. point = TCP4ClientEndpoint(reactor, dest_ip, dest_port)
  88. publisher = Publisher('dioti-dist')
  89. d = point.connect(factory)
  90. d.addCallback(publisher.gotProtocol)
  91. #d.addCallback(connected)
  92. else:
  93. self.tempList = filter(lambda x: x != args[0].name, self.tempList)
  94.  
  95. def removeService(self, *args):
  96. self.ds.streamerPoints.pop(args[0].name, None)
  97.  
  98. def browseError(self, *args):
  99. print "browseError"
  100.  
  101. def resolveError(self, err, *args):
  102. print "resolveError"
Add Comment
Please, Sign In to add comment