Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from urllib.error import HTTPError
- from urllib.parse import urlparse, urlencode
- from urllib.request import (
- build_opener, Request, HTTPCookieProcessor, urlopen
- )
- import copy
- import hashlib
- import io
- import json
- import logging
- import mimetypes
- import os
- import re
- import time
- import uuid
- __author__ = "Sergei Snegirev <tz4678@gmail.com>"
- __license__ = "MIT"
- USER_AGENT = "Mozilla/5.0 (vkclient)"
- API_VERSION = 5.33
- API_DELAY = 0.34
- TIMEOUT = 15
- class VKClient:
- http_re = re.compile('https?://', re.I)
- def __init__(self,
- access_token=None,
- user_id=None,
- expires=None,
- secret=None,
- client_id=None,
- client_secret=None,
- scope=None,
- captcha_handler=None,
- opener=None,
- api_version=API_VERSION,
- api_delay=API_DELAY,
- timeout=TIMEOUT):
- self.access_token = access_token
- self.user_id = user_id
- self.expires = expires
- self.secret = secret
- self.client_id = client_id
- self.client_secret = client_secret
- self.scope = scope
- self.captcha_handler = captcha_handler
- if opener is None:
- opener = build_opener(HTTPCookieProcessor())
- opener.addheaders = [('user-agent', USER_AGENT)]
- self.opener = opener
- self.api_version = api_version
- self.api_delay = api_delay
- self.timeout = timeout
- self.last_api_request = 0
- name = '{}.{}'.format(self.__module__, self.__class__.__name__)
- self.log = logging.getLogger(name)
- def request(self, url, data=None, headers={}):
- if isinstance(data, dict):
- data = urlencode(data).encode('ascii')
- req = Request(url, data, headers)
- try:
- h = self.opener.open(req, timeout=self.timeout)
- except HTTPError as e:
- self.log.warning("HTTPError: %d %s", e.code, e.msg)
- h = e
- body = h.read()
- encoding = h.headers.get_content_charset('utf-8')
- body = body.decode(encoding)
- return json.loads(body)
- def auth_direct(self, username, password):
- params = dict(username=username,
- password=password,
- client_id=self.client_id,
- client_secret=self.client_secret,
- scope=self.scope,
- grant_type="password")
- url = 'https://oauth.vk.com/token'
- while True:
- self.log.debug("Request: url=%s, params=%s", url, params)
- response = self.request(url, params)
- self.log.debug("Response: %s", response)
- if 'error' in response:
- error = AuthError(response)
- if error.name == 'need_captcha':
- params.update({
- 'captcha_key': self._handle_captcha(error),
- 'captcha_sid': error.captcha_sid
- })
- continue
- raise error
- break
- self.access_token = response['access_token']
- self.user_id = response['user_id']
- expires_in = response['expires_in']
- if expires_in > 0:
- self.expires = time.time() + expires_in
- # Если в scope есть nohttps будет возвращено поле secret
- self.secret = response.get('secret')
- def auth_mobile(self):
- raise NotImplementedError("method not implemented yet")
- def auth_site(self):
- raise NotImplementedError("method not implemented yet")
- def auth_server(self):
- raise NotImplementedError("method not implemented yet")
- def api(self, method, params={}):
- """Отправляет запрос к API.
- :param method: Имя метода.
- :param params: - словарь, необязательный. Параметры запроса.
- :return: Значение поля `response`.
- """
- params = dict(params)
- if not 'access_token' in params and self.access_token:
- params['access_token'] = self.access_token
- if not 'v' in params and self.api_version:
- params['v'] = self.api_version
- scheme = 'http' if self.secret else 'https'
- url = '{}://api.vk.com/method/{}'.format(scheme, method)
- while True:
- if self.secret:
- if 'sig' in params:
- del params['sig']
- query = urlencode(params)
- s = '/method/{}?{}{}'.format(method, query, self.secret)
- params['sig'] = hashlib.md5(s.encode('ascii')).hexdigest()
- delay = self.api_delay + self.last_api_request - time.time()
- if delay > 0:
- self.log.debug("Waiting for %.6f sec.", delay)
- self.sleep(delay)
- self.log.debug("Request: url=%s, params=%s", url, params)
- response = self.request(url, params)
- self.log.debug("Response: %s", response)
- self.last_api_request = time.time()
- if 'error' in response:
- error = ApiError(response['error'])
- if error.code == 14:
- params.update({
- 'captcha_key': self._handle_captcha(error),
- 'captcha_sid': error.captcha_sid
- })
- continue
- raise error
- return response['response']
- def execute(self, code, **kw):
- kw['code'] = code
- return self.api('execute', kw)
- def test_access_token(self):
- """Проверяет `access_token`.
- Работает только с клиентскими приложениями."""
- try:
- # можно вызвать любой метод требующий передачи access_token
- self.users.get()
- except ApiError:
- return False
- return True
- def upload(self, url, files):
- self.log.debug('Upload files to %s', url)
- boundary = uuid.uuid4().hex
- buf = io.BytesIO()
- for k, v in files.items():
- if isinstance(v, str):
- if self.http_re.match(v):
- content = urlopen(v).read()
- filename = os.path.basename(urlparse(v).path)
- else:
- with open(v, 'rb') as fp:
- content = fp.read()
- filename = os.path.basename(v)
- elif isinstance(v, io.IOBase):
- filename = os.path.basename(v.name)
- content = v.read()
- if not isinstance(content, bytes):
- content = str(content).encode('utf-8')
- elif isinstance(v, (list, tuple)):
- filename, content = v
- if not isinstance(content, bytes):
- content = str(content).encode('utf-8')
- else:
- raise ValueError("bad value: {!r}".format(v))
- content_type = mimetypes.guess_type(filename)[0] or\
- 'application/octeat-stream'
- headers = (
- '--{}\r\n'
- 'Content-Disposition: form-data; name="{}"; filename="{}"\r\n'
- 'Content-Type: {}\r\n\r\n'
- ).format(boundary, k, filename, content_type)
- buf.write(bytes(headers, 'utf-8'))
- buf.write(content)
- buf.write(b'\r\n')
- buf.write(bytes('--{}--\r\n'.format(boundary), 'utf-8'))
- response = self.request(url, buf.getvalue(), {
- 'Content-Type': 'multipart/form-data; boundary=' + boundary
- })
- self.log.debug("Response: %s", response)
- if 'error' in response:
- raise UploadError(response['error'])
- return response
- def sleep(self, seconds):
- time.sleep(seconds)
- def _handle_captcha(self, error):
- if self.captcha_handler:
- solution = self.captcha_handler.handle(self.opener,
- error.captcha_img)
- if solution is not None:
- return solution
- raise error
- def copy(self, **kw):
- clone = copy.copy(self)
- clone.__dict__.update(kw)
- return clone
- def __getattr__(self, name):
- # users, auth, wall и т.д.
- if re.match('[a-z]+$', name):
- return _ComplexMethod(self, name)
- raise AttributeError(name)
- class VKError(Exception):
- pass
- class AuthError(VKError):
- def __init__(self, details):
- message = "{error_description} ({error})".format(**details)\
- if 'error_description' in details else details['error']
- super().__init__(message)
- self.name = details['error']
- self.description = details.get('error_description')
- self.captcha_img = details.get('captcha_img')
- self.captcha_sid = details.get('captcha_sid')
- self.redirect_uri = details.get('redirect_uri')
- class ApiError(VKError):
- def __init__(self, details):
- super().__init__("{error_code} - {error_msg}".format(**details))
- self.code = details['error_code']
- self.msg = details['error_msg']
- self.captcha_img = details.get('captcha_img')
- self.captcha_sid = details.get('captcha_sid')
- self.redirect_uri = details.get('redirect_uri')
- params = {x['key']: x['value'] for x in details['request_params']}
- self.method = params.pop('method')
- self.oauth = params.pop('oauth')
- self.params = params
- class UploadError(VKError):
- pass
- class _ComplexMethod:
- def __init__(self, client, prefix):
- self._client = client
- self._prefix = prefix
- def __getattr__(self, name):
- # Может принимать словарь либо именованные параметры
- def wrapper(*arg, **kw):
- # Если имя параметра совпадает с ключевым словом, добавляем к нему
- # подчеркивание:
- # <object VKClient>.storage.get(_global=1, ...)
- params = {k[1:] if len(k) > 1 and k.startswith('_') else k: v
- for k, v in kw.items()}
- if len(arg):
- d = dict(arg[0])
- d.update(params)
- params = d
- method = '{}.{}'.format(self._prefix, name)
- return self._client.api(method, params)
- # e.g.: users.isAppUser
- if re.match('[a-z]+([A-Z][a-z]+)*$', name):
- return wrapper
- raise AttributeError(name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement