Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Sun Feb 18 13:38:44 2018
- @author: Richard
- AAAAB3NzaC1yc2EAAAABJQAAAQEAwxk/qzNvj/XEploppD7RuJQRqoP8bOUDjTiAYvzCCgiBp9HnHrALbiyRFQcNWtQY3j0tlFZ1qhEju78Pe9wWsgw5+mlcTsX15vDcmMF6oF4IctSQpyrnQnzUs7F+QuVnPkfKT22VA4y9id5K5rHQkNb0HOqMl5PMn3pi+ALzy8RsYgyow5YhNNE7gy/nTkZhdjkJimB+pTWl6Qj8IxgWpm9v6YXOoaEU2pZMCaqA8LiGHrj8Xl6wV0T7xF2HgK48vISlWpDFc/zfJtMFtuud1esMYNoGePoCGHn932chfeWR38e08EZGcUPVwimU4Av03rqa1MdmjRhT7SH4xbKhtQ== rsa-key-20171206
- """
- import cPickle
- import requests
- import time
- import threading
- import utils
- class HTTPUplink:
- def __init__(self,db,defaultinterval,l):
- self.log=l
- self.db=db
- self.endpoints={}
- self.ps=self.db.pubsub()
- self.ps.subscribe(**{'HTTP':self.recv_HTTP})
- self.s = requests.Session()
- self.default_interval=defaultinterval
- def recv_HTTP(self,m):
- r= cPickle.loads(m['data'])
- if type(r)== requests.Response: #feedback
- self.log.info('resp seen')
- return
- self.log.info('req received from HTTP channel')
- url = utils.getServerAddressFromURL(r.url)
- if url not in self.endpoints:
- self.endpoints[url]={'dummytime':0.0,'lasttime':0.0,'interval':self.default_interval}
- endpoint = self.endpoints[url]
- endpoint['dummytime']+=endpoint['interval']
- o_lrt = endpoint['lasttime']
- endpoint['lasttime']=time.clock()
- endpoint['dummytime']-=(endpoint['lasttime']-o_lrt)
- endpoint['dummytime']=max(endpoint['dummytime'],0.0)
- s= 'preparing to send request with delay: %f ' % endpoint['dummytime']
- self.log.info(s)
- t =threading.Timer(endpoint['dummytime'],self.send,[r])
- t.start()
- def send(self,r):
- self.log.info('waiting on server for resp in thread')
- resp = requests.Response()
- try:
- resp = self.s.send(r,timeout=2)
- resp.raise_for_status()
- except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError,
- requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
- self.log.warning(e.message)
- if 399<resp.status_code<500:
- self.log.warning('4xx error, not republishing.')
- else:
- self.log.info('republishing request')
- self.db.publish('HTTP',cPickle.dumps(r))#send this shit right back through
- return #don't give callback 500+ errors
- finally:
- if resp.status_code <500:
- resp.content
- pr=cPickle.dumps(resp)
- self.log.info('sending response back through HTTP channel')
- self.db.publish('HTTP',pr)
- def run(self):
- for x in self.ps.listen():
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement