Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from .access_token import AccessToken
- import logging
- import re
- import urllib.parse
- AUTH_BASE = "https://oauth.vk.com"
- REDIRECT_URI = AUTH_BASE + "/blank.html"
- class AuthMixin:
- def get_auth_url(self, params):
- return "{}?{}".format(
- urllib.parse.urljoin(AUTH_BASE, self.auth_path),
- urllib.parse.urlencode(params)
- )
- class DirectAuth(AuthMixin):
- """Осуществляет прямую авторизацию <https://vk.com/dev/auth_direct>"""
- auth_path = 'token'
- def __init__(self,
- client,
- client_id,
- client_secret,
- scope=None,
- username=None,
- password=None,
- test_redirect_uri=None):
- self.logger = logging.getLogger(
- ".".join([self.__class__.__module__, self.__class__.__name__]))
- self.client = client
- self.client_id = client_id
- self.client_secret = client_secret
- self.scope = scope
- self.username = username
- self.password = password
- self.test_redirect_uri = test_redirect_uri
- def authorize(self,
- username=None,
- password=None,
- captcha_key=None,
- captcha_sid=None):
- self.username = username or self.username
- self.password = password or self.password
- if not self.username or not self.password:
- raise AuthError("Username and password are required")
- params = self.get_auth_params()
- if captcha_key:
- assert captcha_sid is not None
- params['captcha_key'] = captcha_key
- params['captcha_sid'] = captcha_sid
- url = self.get_auth_url(params)
- self.logger.debug("Url: %s", url)
- response = self.client.get(url)
- if 'error' in response:
- raise OAuthError(response)
- self.client.access_token = AccessToken(
- key=response.access_token,
- user_id=response.user_id,
- expires_in=response.expires_in,
- secret=response.get('secret')
- )
- self.logger.info("Successfully authorized")
- def get_auth_params(self):
- params = dict(
- grant_type='password',
- client_id=self.client_id,
- client_secret=self.client_secret,
- username=self.username,
- password=self.password,
- v=self.client.api_version
- )
- if self.scope:
- params['scope'] = self.scope
- if self.test_redirect_uri:
- params['test_redirect_uri']
- return params
- class ClientAuth(AuthMixin):
- """Осуществляет авторизацию Standalone приложения
- <https://vk.com/dev/auth_mobile>"""
- auth_path = 'authorize'
- def __init__(self,
- client,
- client_id,
- scope=None,
- username=None,
- password=None,
- display=None,
- redirect_uri=REDIRECT_URI):
- self.logger = logging.getLogger(
- ".".join([self.__class__.__module__, self.__class__.__name__]))
- self.client = client
- self.client_id = client_id
- self.scope = scope
- self.username = username
- self.password = password
- self.redirect_uri = redirect_uri
- self.display = display
- def authorize(self, username=None, password=None):
- self.username = username or self.username
- self.password = password or self.password
- if not self.username or not self.password:
- raise AuthError("Username and password are required")
- params = self.get_auth_params()
- r = self.client.session.get(self.get_auth_url(params))
- action = self.get_form_action(r.text)
- params = self.get_login_form_params(r.text)
- self.submit_login_form(action, params)
- def submit_login_form(self, action, params):
- params['email'] = self.username
- params['pass'] = self.password
- r = self.client.session.post(action, params)
- self.logger.debug("Current URL: %s", r.url)
- if r.url.find('#'):
- # Копипаст
- h = r.url.split("#")[1]
- res = dict(urllib.parse.parse_qsl(h))
- assert 'access_token' in res
- self.client.access_token = AccessToken(
- res['access_token'],
- int(res['user_id']),
- int(res['expires_in']),
- res.get('secret')
- )
- return
- ga_match = re.search(
- 'https://login\.vk\.com/\?act=grant_access([^"]+)', r.text)
- # Страница с подтверждением, где кнопочку Allow нужно нажать
- if ga_match:
- ga_url = ga_match.group(0)
- r = self.client.session.get(ga_url)
- h = r.url.split("#")[1]
- res = dict(urllib.parse.parse_qsl(h))
- if 'error' in res:
- raise AuthError(res.get('error_description') or res['error'])
- assert 'access_token' in res
- self.client.access_token = AccessToken(
- res['access_token'],
- int(res['user_id']),
- int(res['expires_in']),
- res.get('secret')
- )
- return
- captcha_match = re.search(
- '([^"]+/captcha\.php\?sid=(\d+)[^"]+)', r.text)
- if captcha_match:
- captcha_url, captcha_sid = captcha_match.groups()
- try:
- captcha_key = self.get_captcha_key(captcha_url)
- except NotImplementedError:
- raise AuthError("Captcha required")
- params['captcha_sid'] = captcha_sid
- params['captcha_key'] = captcha_key
- # Хз можно, наверное, закомментировать. Все хеши к сессии привязаны
- params = self.get_login_form_params(r.text)
- self.submit_login_form(action, params)
- return
- error_match = re.search(
- '(warning|error)">([^<]+)', r.text)
- if error_match:
- raise AuthError(error_match.group(2))
- raise AuthError("WTF?")
- def get_captcha_key(self, url):
- raise NotImplementedError
- def get_form_action(self, content):
- match = re.search('action="([^"]+)', content)
- return match.group(1)
- def get_login_form_params(self, content):
- matches = re.findall(
- '(_origin|ip_h|lg_h|to|expire)" value="([^"]+)', content)
- return dict(matches)
- def get_auth_params(self):
- params = dict(
- client_id=self.client_id,
- redirect_uri=self.redirect_uri,
- v=self.client.api_version,
- response_type='token'
- )
- if self.scope:
- params['scope'] = self.scope
- if self.display:
- params['display'] = self.display
- return params
- class AuthError(Exception):
- pass
- class OAuthError(AuthError):
- def __init__(self, response):
- self.error_type = response['error']
- self.error_description = response.get('error_description')
- self.captcha_img = response.get('captcha_img')
- self.captcha_sid = response.get('captcha_sid')
- self.redirect_uri = response.get('redirect_uri')
- def __str__(self):
- if self.error_description:
- return ": ".join([self.error_type, self.error_description])
- return self.error_type
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement