Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import json
- from StringIO import StringIO
- from twisted.application import internet
- from twisted.application.service import Service
- from twisted.web import resource, server
- from twisted.internet import reactor
- from twisted.internet.endpoints import TCP4ClientEndpoint
- from twisted.web.client import Agent, FileBodyProducer, readBody
- from twisted.web.http_headers import Headers
- from services.txbonjour import discovery
- from mqtt.client.factory import MQTTFactory
- from mqtt import v311
- from conf import utils
- from security import Storage
- class DistributeStreamService:
- def __init__(self, ip='', port=''):
- self.ip = ip
- self.port = port
- self.streamers = []
- self.streamerPoints = {}
- # Start mDNS
- proto = DiscoverProtocol(self)
- dservice = discovery.listenBonjour(proto, '_diotiStream._tcp')
- dservice.startService()
- def publish(self, topic, data):
- print "Start Streaming1"
- for ip, streamer in self.streamerPoints.iteritems():
- print "Start Streaming3"
- print topic
- print streamer
- #streamer.publish(topic, data)
- class Publisher(Service):
- def __init__(self, publ_name):
- self.protocol = None
- self.publ_name = publ_name
- def gotProtocol(self, p):
- self.protocol = p
- print ("connecting")
- return p.connect(self.publ_name, keepalive=0, version=v311)
- def publish(self, topic, data):
- d = self.protocol.publish(topic=topic, message=data)
- d.addErrback(self.printError)
- def printError(self, *args):
- print(args)
- class DiscoverProtocol(discovery.BroadcastProtocol):
- def __init__(self, ds):
- self.ds = ds
- self.tempList = []
- def registerReceived(self, *args):
- print "now broadcasting"
- def addService(self, *args):
- def connected(params):
- print "connected"
- self.ds.streamerPoints[args[0].name] = publisher
- self.tempList = filter(lambda x: x != args[0].name, self.tempList)
- if args[0].name not in self.ds.streamerPoints and args[0].name not in self.tempList:
- self.tempList.append(args[0].name)
- print (self.tempList)
- #self.ds.streamers.append(args[0].name)
- if args[0].name != 'http://' + self.ds.ip + ":" + str(self.ds.port):
- dest_port = int(args[0].name.split(':')[2].encode())
- dest_ip = args[0].name.split(':')[1].replace('/', '').encode()
- factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
- point = TCP4ClientEndpoint(reactor, dest_ip, dest_port)
- publisher = Publisher('dioti-dist')
- d = point.connect(factory)
- d.addCallback(publisher.gotProtocol)
- #d.addCallback(connected)
- else:
- self.tempList = filter(lambda x: x != args[0].name, self.tempList)
- def removeService(self, *args):
- self.ds.streamerPoints.pop(args[0].name, None)
- def browseError(self, *args):
- print "browseError"
- def resolveError(self, err, *args):
- print "resolveError"
Add Comment
Please, Sign In to add comment