Guest User

Class

a guest
Mar 24th, 2023
17
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.43 KB | None | 0 0
  1. class OAuthCredentialsStrategy(rest_api.TokenStrategy):
  2. """Strategy class for handling OAuth2 authorization.
  3.  
  4. Parameters
  5. ----------
  6. client : typing.Optional[hikari.snowflakes.SnowflakeishOr[hikari.guilds.PartialApplication]]
  7. Object or ID of the application this client credentials strategy should
  8. authorize as.
  9. client_secret : str
  10. Client secret to use when authorizing.
  11. auth_code : str
  12. Auth code given from Discord when user authorizes
  13. redirect_uri: str
  14. The redirect uri that was included in the authorization request
  15.  
  16. Other Parameters
  17. ----------------
  18. scopes : typing.Sequence[str]
  19. The scopes to authorize for.
  20. """
  21.  
  22. __slots__: typing.Sequence[str] = (
  23. "_client_id",
  24. "_client_secret",
  25. "_exception",
  26. "_expire_at",
  27. "_lock",
  28. "_scopes",
  29. "_token",
  30. "_auth_code",
  31. "_redirect_uri",
  32. "_refresh_token",
  33. )
  34.  
  35. def __init__(
  36. self,
  37. client: snowflakes.SnowflakeishOr[guilds.PartialApplication],
  38. client_secret: str,
  39. auth_code: str,
  40. redirect_uri: str,
  41. *,
  42. scopes: typing.Sequence[typing.Union[applications.OAuth2Scope, str]] = (applications.OAuth2Scope.IDENTIFY,),
  43. ) -> None:
  44. self._client_id = snowflakes.Snowflake(client)
  45. self._client_secret = client_secret
  46. self._exception: typing.Optional[errors.ClientHTTPResponseError] = None
  47. self._expire_at = 0.0
  48. self._lock = asyncio.Lock()
  49. self._scopes = scopes
  50. self._token: typing.Optional[str] = None
  51. self._refresh_token = None
  52. self._auth_code = auth_code
  53. self._redirect_uri = redirect_uri
  54.  
  55. @property
  56. def client_id(self) -> snowflakes.Snowflake:
  57. """ID of the application this token strategy authenticates with."""
  58. return self._client_id
  59.  
  60. def _is_expired(self) -> bool:
  61. return time.monotonic() >= self._expire_at
  62.  
  63. @property
  64. def scopes(self) -> typing.Sequence[typing.Union[applications.OAuth2Scope, str]]:
  65. """Sequence of scopes this token strategy authenticates for."""
  66. return self._scopes
  67.  
  68. @property
  69. def token_type(self) -> applications.TokenType:
  70. return applications.TokenType.BEARER
  71.  
  72. async def acquire(self, client: rest_api.RESTClient) -> str:
  73. if not self._auth_code:
  74. raise RuntimeError("Token has been invalidated. Unable to get current or new token")
  75.  
  76. if self._token and not self._is_expired():
  77. return self._token
  78.  
  79. async with self._lock:
  80. if self._token and not self._is_expired():
  81. return self._token
  82.  
  83. if self._exception:
  84. # If we don't copy the exception then python keeps adding onto the stack each time it's raised.
  85. raise copy.copy(self._exception) from None
  86.  
  87. try:
  88. if not self._token:
  89. response = await client.authorize_access_token(
  90. client=self._client_id,
  91. client_secret=self._client_secret,
  92. code=self._auth_code,
  93. redirect_uri=self._redirect_uri,
  94. )
  95. else:
  96. response = await client.refresh_access_token(
  97. client=self._client_id, client_secret=self._client_secret, refresh_token=self._refresh_token
  98. )
  99.  
  100. except errors.ClientHTTPResponseError as exc:
  101. if not isinstance(exc, errors.RateLimitTooLongError):
  102. # If we don't copy the exception then python keeps adding onto the stack each time it's raised.
  103. self._exception = copy.copy(exc)
  104. raise
  105.  
  106. # Expires in is lowered a bit in-order to lower the chance of a dead token being used.
  107. self._expire_at = time.monotonic() + math.floor(response.expires_in.total_seconds() * 0.99)
  108. self._token = str(response.access_token)
  109. self._refresh_token = response.refresh_token
  110. return self._token
  111.  
  112. def invalidate(self, token: typing.Optional[str] = None) -> None:
  113. if not token or token == self._token:
  114. self._expire_at = 0.0
  115. self._token = None
  116. self._refresh_token = None
  117. self._auth_code = None
  118.  
  119.  
Advertisement
Add Comment
Please, Sign In to add comment