Advertisement
Guest User

Untitled

a guest
Feb 28th, 2018
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.80 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Sun Feb 18 13:38:44 2018
  4.  
  5. @author: Richard
  6. AAAAB3NzaC1yc2EAAAABJQAAAQEAwxk/qzNvj/XEploppD7RuJQRqoP8bOUDjTiAYvzCCgiBp9HnHrALbiyRFQcNWtQY3j0tlFZ1qhEju78Pe9wWsgw5+mlcTsX15vDcmMF6oF4IctSQpyrnQnzUs7F+QuVnPkfKT22VA4y9id5K5rHQkNb0HOqMl5PMn3pi+ALzy8RsYgyow5YhNNE7gy/nTkZhdjkJimB+pTWl6Qj8IxgWpm9v6YXOoaEU2pZMCaqA8LiGHrj8Xl6wV0T7xF2HgK48vISlWpDFc/zfJtMFtuud1esMYNoGePoCGHn932chfeWR38e08EZGcUPVwimU4Av03rqa1MdmjRhT7SH4xbKhtQ== rsa-key-20171206
  7. """
  8. import cPickle
  9. import requests
  10.  
  11. import time
  12. import threading
  13. import utils
  14. class HTTPUplink:
  15. def __init__(self,db,defaultinterval,l):
  16. self.log=l
  17. self.db=db
  18. self.endpoints={}
  19. self.ps=self.db.pubsub()
  20. self.ps.subscribe(**{'HTTP':self.recv_HTTP})
  21. self.s = requests.Session()
  22. self.default_interval=defaultinterval
  23. def recv_HTTP(self,m):
  24. r= cPickle.loads(m['data'])
  25. if type(r)== requests.Response: #feedback
  26. self.log.info('resp seen')
  27. return
  28. self.log.info('req received from HTTP channel')
  29. url = utils.getServerAddressFromURL(r.url)
  30. if url not in self.endpoints:
  31. self.endpoints[url]={'dummytime':0.0,'lasttime':0.0,'interval':self.default_interval}
  32. endpoint = self.endpoints[url]
  33. endpoint['dummytime']+=endpoint['interval']
  34. o_lrt = endpoint['lasttime']
  35. endpoint['lasttime']=time.clock()
  36. endpoint['dummytime']-=(endpoint['lasttime']-o_lrt)
  37. endpoint['dummytime']=max(endpoint['dummytime'],0.0)
  38. s= 'preparing to send request with delay: %f ' % endpoint['dummytime']
  39. self.log.info(s)
  40. t =threading.Timer(endpoint['dummytime'],self.send,[r])
  41. t.start()
  42. def send(self,r):
  43. self.log.info('waiting on server for resp in thread')
  44. resp = requests.Response()
  45. try:
  46. resp = self.s.send(r,timeout=2)
  47. resp.raise_for_status()
  48. except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError,
  49. requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
  50. self.log.warning(e.message)
  51. if 399<resp.status_code<500:
  52. self.log.warning('4xx error, not republishing.')
  53. else:
  54. self.log.info('republishing request')
  55. self.db.publish('HTTP',cPickle.dumps(r))#send this shit right back through
  56. return #don't give callback 500+ errors
  57. finally:
  58. if resp.status_code <500:
  59. resp.content
  60. pr=cPickle.dumps(resp)
  61. self.log.info('sending response back through HTTP channel')
  62. self.db.publish('HTTP',pr)
  63. def run(self):
  64. for x in self.ps.listen():
  65. pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement