Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- def post_json(self, uri: str, data: Union['dict', 'list']) -> Awaitable[HTTPResponse]:
- return self._session.request('POST', urljoin(self._hostname, uri), json=data)
- def get(self, uri: 'str', extra_headers: 'dict'=None, params: dict=None) -> Awaitable[HTTPResponse]:
- return self._session.request('GET', urljoin(self._hostname, uri))
- def close(self):
- return self._session.close()
- def __init__(self):
- self._http_client = HttpWrapper(hostname=self._host_name,
- global_headers={'Accept': 'application/json', 'Content-Type': 'application/json'},
- user='user',
- password='password')
- self._notifications_identifier = None
- self._keep_running = False
- self._loop = asyncio.get_event_loop()
- self._log = logging.getLogger("NotificationsHandler")
- async def register_for_notifications(self) -> bool:
- body = { "ietf-subscribed-notifications:input": {"encoding": "encode-json"} }
- try:
- async with self._http_client.post_json(uri='ietf-subscribed-notifications:establish-subscription', data=body) as resp:
- json_resp = await resp.json()
- self._notifications_identifier = json_resp['ietf-subscribed-notifications:output']['identifier']
- self._keep_running = True
- except Exception as error:
- self._log.error('Failed to register for notifications: {}'.format(error))
- self._log.debug(traceback.format_exc())
- self._notifications_identifier = None
- self._keep_running = False
- return self._keep_running
- async def handle_notifications(self, notifications_callback):
- """
- handle the notifications
- :param notifications_callback: coro which receives the notification as JSON
- :return: None
- """
- async with self._http_client.get(uri='get-notifications-json',
- extra_headers={'Cache-Control': 'no-cache', 'Accept': 'text/event-stream'}) as response:
- buffer = b""
- async for data, end_of_http_chunk in response.content.iter_chunks():
- buffer += data
- if end_of_http_chunk:
- try:
- update_json = json.loads(buffer)
- if update_json['identifier'] == self._notifications_identifier:
- self._loop.create_task(notifications_callback(update_json))
- except:
- self._log.exception("Failed to read update")
- buffer = b""
- if not self._keep_running:
- return
- def close(self):
- self._http_client.close()
- self.stop_handle_notifications()
- async def stop_handle_notifications(self):
- self._keep_running = False
Add Comment
Please, Sign In to add comment