Guest User

Untitled

a guest
Dec 20th, 2017
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. import asyncio
  2.  
  3. def post_json(self, uri: str, data: Union['dict', 'list']) -> Awaitable[HTTPResponse]:
  4. return self._session.request('POST', urljoin(self._hostname, uri), json=data)
  5.  
  6. def get(self, uri: 'str', extra_headers: 'dict'=None, params: dict=None) -> Awaitable[HTTPResponse]:
  7. return self._session.request('GET', urljoin(self._hostname, uri))
  8.  
  9. def close(self):
  10. return self._session.close()
  11.  
  12. def __init__(self):
  13. self._http_client = HttpWrapper(hostname=self._host_name,
  14. global_headers={'Accept': 'application/json', 'Content-Type': 'application/json'},
  15. user='user',
  16. password='password')
  17. self._notifications_identifier = None
  18. self._keep_running = False
  19. self._loop = asyncio.get_event_loop()
  20. self._log = logging.getLogger("NotificationsHandler")
  21.  
  22. async def register_for_notifications(self) -> bool:
  23. body = { "ietf-subscribed-notifications:input": {"encoding": "encode-json"} }
  24. try:
  25. async with self._http_client.post_json(uri='ietf-subscribed-notifications:establish-subscription', data=body) as resp:
  26. json_resp = await resp.json()
  27. self._notifications_identifier = json_resp['ietf-subscribed-notifications:output']['identifier']
  28. self._keep_running = True
  29. except Exception as error:
  30. self._log.error('Failed to register for notifications: {}'.format(error))
  31. self._log.debug(traceback.format_exc())
  32. self._notifications_identifier = None
  33. self._keep_running = False
  34. return self._keep_running
  35.  
  36. async def handle_notifications(self, notifications_callback):
  37. """
  38. handle the notifications
  39. :param notifications_callback: coro which receives the notification as JSON
  40. :return: None
  41. """
  42. async with self._http_client.get(uri='get-notifications-json',
  43. extra_headers={'Cache-Control': 'no-cache', 'Accept': 'text/event-stream'}) as response:
  44. buffer = b""
  45. async for data, end_of_http_chunk in response.content.iter_chunks():
  46. buffer += data
  47. if end_of_http_chunk:
  48. try:
  49. update_json = json.loads(buffer)
  50. if update_json['identifier'] == self._notifications_identifier:
  51. self._loop.create_task(notifications_callback(update_json))
  52. except:
  53. self._log.exception("Failed to read update")
  54. buffer = b""
  55. if not self._keep_running:
  56. return
  57.  
  58. def close(self):
  59. self._http_client.close()
  60. self.stop_handle_notifications()
  61.  
  62. async def stop_handle_notifications(self):
  63. self._keep_running = False
Add Comment
Please, Sign In to add comment