Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hashlib
- import io
- import json
- import logging
- import os
- import platform
- import re
- import socket
- import time
- import urllib.parse
- import urllib.request
- import uuid
- __author__ = "Sergei Snegirev (tz4678@gmail.com)"
- __copyright__ = "Copyright (C) 2013-2016 Sergei Snegirev"
- __license__ = "MIT"
- __version__ = "3.0"
- __url__ = "https://github.com/***/vkapi/"
- __all__ = ('Client', 'ObjectDict', 'AccessToken', 'Permissions',
- 'PERMISSION_LIST', 'PERMISSION_STR', 'permissions2int',
- 'permissions2str', 'ClientError', 'ApiError', 'AuthError',
- 'UploadError')
- HTTP_RE = re.compile('https?://', re.I)
- USER_AGENT_FORMAT = "Mozilla/5.0 (vkapi.Client/{ver} Python/{py_ver} +{url})"
- USER_AGENT = USER_AGENT_FORMAT.format(ver=__version__,
- py_ver=platform.python_version(),
- url=__url__)
- class Client:
- api_version = 5.44
- api_delay = 0.34
- api_host = "api.vk.com"
- api_path = "/method/"
- auth_host = "oauth.vk.com"
- auth_path = "token"
- timeout = 15
- retries = 3
- def __init__(self,
- access_token=None,
- client_id=None,
- client_secret=None,
- scope=None,
- username=None,
- password=None,
- api_version=None,
- api_delay=None,
- api_host=None,
- api_path=None,
- auth_host=None,
- auth_path=None,
- https=None,
- lang=None,
- opener=None,
- retries=None,
- timeout=None):
- """Клиент для работы с API Вконтакте.
- + Нативная поддержка методов API
- + Прямая авторизация
- + Работа с подписью
- + Загрузка файлов
- :param access_token: Токен полученный при авторизации
- :type access_token: AccessToken instance
- :param client_id: Id приложения
- :type client_id: int
- :param client_secret: Секретный ключ приложения
- :type client_secret: str
- :param scope: Права доступа приложения
- :type scope: str or int
- :param username: Имя пользователя
- :type username: str
- :param password: Пароль
- :type password: str
- :param api_version: Версия API
- :type api_version: float
- :param api_delay: Задержка между вызовами методов API. Существуют
- ограничения на количество обращений в секунду к методам API
- :type api_delay: int or float
- :param api_host:
- :type api_host: str
- :param api_path:
- :type api_path: str
- :param auth_host:
- :type auth_host: str
- :param auth_path:
- :type auth_path: str
- :param https: Если равен 1, то методы API возвращают https ссылки на
- фотографии и медиа файлы. Работает только если запросы
- осуществляются через https.
- :type https: int
- :param lang: Используемый язык (ru,ua,be,es,fi,de,it)
- :type lang: str
- :param opener: Используется для отправки запросов
- :type opener: urllib.request.OpenerDirector instance
- :param retries: Количество повторных запросов в случае ошибки таймаута.
- Если значение равно -1, то запросы будут посылаться, пока не придет
- ответ.
- :type retries: int
- :param timeout: Таймаут
- :type timeout: int or float
- """
- name = '{}.{}'.format(self.__module__, self.__class__.__name__)
- self.logger = logging.getLogger(name)
- self.access_token = access_token
- self.client_id = client_id
- self.client_secret = client_secret
- self.scope = scope
- self.username = username
- self.password = password
- self.api_version = api_version or self.api_version
- self.api_delay = api_delay or self.api_delay
- self.api_host = api_host or self.api_host
- self.api_path = api_path or self.api_path
- self.auth_host = auth_host or self.auth_host
- self.auth_path = auth_path or self.auth_path
- self.lang = lang
- self.https = https
- if not opener:
- # Включаем поддержку cookies и используем системные прокси
- opener = urllib.request.build_opener(
- urllib.request.HTTPCookieProcessor(),
- urllib.request.ProxyHandler())
- opener.addheaders = [('User-Agent', USER_AGENT)]
- self.opener = opener
- self.timeout = timeout or self.timeout
- self.retries = retries or self.retries
- #
- self.api = Api(self)
- self.last_api_request = 0
- @property
- def is_authorized(self):
- return self.access_token is not None
- def api_request(self, method, params=None):
- """Делает запрос к API.
- Список методов API: <https://vk.com/dev/methods>
- Подробнее про запросы к API: <https://vk.com/dev/api_requests>
- Пример:
- c = Client()
- r = c.api_request('users.get', {'user_id': 1})
- print("{u.first_name} {u.last_name}".format(u=r[0]))
- Использовать данный метод нет необходимости, так как к API можно
- обращаться "нативно":
- r = c.api.users.get(user_id=1)
- # либо так
- r = c.api.users.get({'user_id': 1})
- # или так
- r = c.api.users.get({'fields': 'photo'}, user_id=1)
- Это намного короче чем вызов api_request.
- :param method: Метод API
- :type method: str
- :param params: Передаваемые параметры
- :type params: dict
- :return: Возвращает результат запроса
- """
- params = dict(params or [])
- if self.is_authorized:
- params['access_token'] = self.access_token.value
- # Позволяем в запросе выбрать версию API. Возможно, потребуется для
- # обращения к каким-нибудь depractated методам
- if 'v' not in params and self.api_version:
- params['v'] = self.api_version
- if 'lang' not in params and self.lang:
- params['lang'] = self.lang
- if 'https' not in params and self.https:
- params['https'] = self.https
- path = urllib.parse.urljoin(self.api_path, method)
- # Кстати, через http ответы сервера приходят заметно быстрее
- # <https://vk.com/dev/api_nohttps>
- if hasattr(self.access_token, "secret") and self.access_token.secret:
- scheme = 'http'
- # Исправлен баг с неверной подписью. См. <path/to/lib/notes.txt>.
- # Нам важно чтобы порядок элементов словаря сохранился.
- # Добавим новый элемент
- params['sig'] = ''
- # Сформировали query string из словаря
- query_string = urllib.parse.urlencode(params)
- # А теперь вырежем параметр sig
- query_string = re.sub('^sig=&|&sig=', '', query_string)
- uri = '{}?{}{}'.format(path, query_string,
- self.access_token.secret)
- params["sig"] = hashlib.md5(uri.encode('ascii')).hexdigest()
- else:
- scheme = 'https'
- # !!! params не должен изменяться после добавления sig
- url = '{}://{}{}'.format(scheme, self.api_host, path)
- payload = bytes(urllib.parse.urlencode(params), 'ascii')
- delay = self.api_delay + self.last_api_request - time.time()
- if delay > 0:
- self.logger.debug("Wait %dms", delay * 1000)
- time.sleep(delay)
- self.logger.debug("Call method %r with parameters: %s", method, params)
- try:
- result = self.parse_json_response(self.http_request(url, payload))
- except urllib.request.HTTPError as e:
- self.logger.debug(e)
- result = self.parse_json_response(e)
- self.last_api_request = time.time()
- if 'error' in result:
- error = ApiError(result.error)
- else:
- error = None
- response = result.get('response')
- return self.process_api_response(response, error, method, params)
- def process_api_response(self, response, error, method, params):
- """Метод для переопределения."""
- if error:
- if error.code is ApiError.CAPTCHA_NEEDED:
- captcha_key = self.get_captcha_key(error.captcha_img)
- params.update({'captcha_key': captcha_key,
- 'captcha_sid': error.captcha_sid})
- return self.api_request(method, params)
- raise error
- return response
- def get_captcha_key(self, captcha_img):
- raise NotImplementedError
- def test_access_token(self):
- try:
- return self.api.execute(code="return true;")
- except ApiError:
- return False
- def authorize(self,
- username=None,
- password=None,
- client_id=None,
- client_secret=None,
- scope=None,
- **kwargs):
- """Метод для прямой авторизации <http://vk.com/dev/auth_direct>."""
- if username:
- self.username = username
- if password:
- self.password = password
- if client_id:
- self.client_id = client_id
- if client_secret:
- self.client_secret = client_secret
- if scope:
- self.scope = scope
- # Обязательные параметры
- if not self.username:
- raise ClientError("username is not set")
- if not self.password:
- raise ClientError("password is not set")
- if not self.client_id:
- raise ClientError("client_id is not set")
- if not self.client_secret:
- raise ClientError("client_secret is not set")
- params = dict(grant_type="password",
- client_id=self.client_id,
- client_secret=self.client_secret,
- username=self.username,
- password=self.password,
- v=self.api_version)
- if self.scope:
- params['scope'] = self.scope
- if kwargs:
- params.update(kwargs)
- self.auth_request(params)
- def auth_request(self, params):
- params = dict(params)
- url = urllib.parse.urljoin("https://" + self.auth_host, self.auth_path)
- url = "{}?{}".format(url, urllib.parse.urlencode(params))
- try:
- result = self.parse_json_response(self.http_request(url))
- except urllib.request.HTTPError as e:
- self.logger.debug(e)
- result = self.parse_json_response(e)
- if 'error' in result:
- response = None
- error = AuthError(result)
- else:
- response = result
- error = None
- self.process_auth_response(response, error, params)
- def process_auth_response(self, response, error, params):
- """Метод для переопределения."""
- if error:
- # Если требуется ввод капчи
- if error.type is AuthError.NEED_CAPTCHA:
- captcha_key = self.get_captcha_key(error.captcha_img)
- params.update({"captcha_key": captcha_key,
- "captcha_sid": error.captcha_sid})
- # Пробуем авторизоваться снова
- self.auth_request(params)
- else:
- raise error
- else:
- self.create_auth_token(response)
- self.logger.info("Successfully authorized")
- def create_auth_token(self, response):
- self.access_token = AccessToken(value=response.access_token,
- user_id=response.get('user_id'),
- expires=response.get('expires_in'),
- secret=response.get('secret'))
- # При прямой авторизации всегда равен 0
- if self.access_token.expires:
- self.access_token.expires += time.time()
- def upload(self, server_url, files):
- """
- Загружает файлы на сервер.
- :param server_url: Адрес сервера
- :type server_url: str
- :param files: Словарь где ключами выступают имена полей. Значения могут
- быть строками (путь до файла либо ссылка на него), файловыми
- объектами либо списками/кортежами,где первый элемент - имя файла,
- а второй - его содержимое (бинарное либо текстовое)
- :type files: dict
- :rtype: ObjectDict
- """
- boundary = uuid.uuid4().hex
- buf = io.BytesIO()
- for name, obj in files.items():
- if isinstance(obj, str):
- # Загружаем по URL
- if HTTP_RE.match(obj):
- content = self.get_content(obj)
- # Всякие image.php?id=foo будут забракованы
- filename = os.path.basename(obj)
- # Читаем файл
- else:
- with open(obj, 'rb') as fp:
- content = fp.read()
- filename = os.path.basename(obj)
- else:
- # {'photo': open("path/to/photo.jpg", 'rb')}
- if isinstance(obj, io.IOBase):
- filename = os.path.basename(obj.name)
- content = obj.read()
- # {'photo': ('photo.jpg', b'<binary data>')}
- elif isinstance(obj, (list, tuple)):
- # нам нужны только два первых элемента остальные будем
- # игнорировать
- filename, content, *_ = obj
- else:
- raise ValueError("Bad Value: {!r}".format(obj))
- if not isinstance(content, (bytearray, bytes)):
- content = str(content).encode('utf-8')
- # Поле Content-Type передавать необязательно, на сервере Вконтакте
- # проверяется расширение файла из поля filename и его содержиме,
- # читаются первые n байтов, и ищутся сигнатуры
- headers_str = ('--{}\r\n'
- 'Content-Disposition: form-data; name="{}"; '
- 'filename="{}"\r\n\r\n')
- # У requests кстати баг, который заставил отказаться от его
- # использования: он неправильно кодирует non-ascii поля в
- # заголовках, поэтому файл "Мой файл.txt" в документах будет
- # отображаться как "???....txt"
- headers_str = headers_str.format(boundary, name, filename)
- buf.write(bytes(headers_str, 'utf-8'))
- buf.write(content)
- buf.write(b'\r\n')
- buf.write(bytes('--{}--\r\n'.format(boundary), 'utf-8'))
- response = self.http_request(server_url, buf.getvalue(), {
- 'Content-Type': 'multipart/form-data; boundary=' + boundary})
- result = self.parse_json_response(response)
- if 'error' in result:
- raise UploadError(result.error)
- return result
- def parse_json(self, content):
- return json.loads(content, object_hook=ObjectDict)
- def parse_json_response(self, response):
- return self.parse_json(str(response.read(), "utf-8"))
- def get_content(self, url):
- return self.http_request(url).read()
- def http_request(self, url, body=None, headers={}):
- start_time = time.time()
- self.logger.debug("Open %s", url)
- request = urllib.request.Request(url, body, headers)
- attempts = 0
- while True:
- try:
- response = self.opener.open(request, timeout=self.timeout)
- request_time = time.time() - start_time
- self.logger.debug("Total Request Time: %dms",
- request_time * 1000)
- return response
- except urllib.request.URLError as e:
- if isinstance(e.reason, socket.timeout):
- # Если значение равно -1, то запросы будут посылаться, пока
- # не придет ответ
- if self.retries != -1:
- attempts += 1
- if attempts > self.retries:
- raise
- else:
- raise
- class Api:
- def __init__(self, client):
- self._client = client
- def __getattr__(self, name):
- if ApiMethod.NAME_RE.match(name):
- return ApiMethod(self, name)
- raise AttributeError(name)
- class ApiMethod:
- NAME_RE = re.compile("[a-z]+([A-Z][a-z]+)*$")
- def __init__(self, client, method):
- self._client = client
- self._method = method
- def __getattr__(self, name):
- if self.NAME_RE.match(name):
- return ApiMethod(self._client, '.'.join([self._method, name]))
- raise AttributeError(name)
- def __call__(self, *arg, **kw):
- # Если имя именованного параметра совпадает с ключевым словом, то
- # добавляем перед именем подчеркивание:
- #
- # client.storage.set(_global=1, key='foo', value='bar')
- params = {k[1:] if len(k) > 1 and k.startswith('_')
- else k: v for k, v in kw.items()}
- if len(arg):
- # Первым аргументом является словарь
- d = dict(arg[0])
- # Если переданы именованные параметры, то обновляем словарь
- d.update(params)
- params = d
- return self._client.api_request(self._method, params)
- class ObjectDict(dict):
- def __getattr__(self, name):
- try:
- return self[name]
- except KeyError:
- raise AttributeError(name)
- def __setattr__(self, name, value):
- self[name] = value
- def __repr__(self):
- return "{}({})".format(self.__class__.__name__, super().__repr__())
- class AccessToken:
- def __init__(self, value, expires=None, user_id=None, secret=None):
- self.value = value
- self.expires = expires
- self.user_id = user_id
- self.secret = secret
- def is_expired(self):
- return self.expires and time.time() > self.expires
- def __repr__(self):
- s = "{}(value={!r}, expires={!r}, user_id={!r}, secret={!r})"
- return s.format(self.__class__.__name__, self.value, self.expires,
- self.user_id, self.secret)
- # Мы используем класс вместо обычных констант, потому как мы не можем получить
- # локальные переменные модуля (наши константы) так же просто:
- #
- # print(vars(Permissions))
- #
- # c = Client(
- # ...
- # scope=Permissions.FRIENDS | Permissions.VIDEO | Permissions.OFFLINE
- # )
- #
- class Permissions:
- """Подробнее про права приложения можно прочитать по ссылке
- <https://vk.com/dev/permissions>
- """
- NOTIFY = 1
- FRIENDS = 2
- PHOTOS = 4
- AUDIO = 8
- VIDEO = 16
- DOCS = 131072
- NOTES = 2048
- PAGES = 128
- STATUS = 1024
- OFFERS = 32
- QUESTIONS = 64
- WALL = 8192
- GROUPS = 262144
- MESSAGES = 4096
- EMAIL = 4194304
- NOTIFICATIONS = 524288
- STATS = 1048576
- ADS = 32768
- MARKET = 134217728
- OFFLINE = 65536
- ALL = 140492031
- PERMISSION_LIST = [v.lower() for v in dir(Permissions)
- if re.match('[A-Z]+$', v) and v != 'ALL']
- PERMISSIONS_STR = ",".join(PERMISSION_LIST)
- def permissions2int(s):
- """Преобразует строку с правами доступа приложения в число
- >>> permissions2int('friends,video,offline')
- 65554
- """
- mask = 0
- for v in s.split(','):
- # Для сервера все равно передаем мы notify или NoTiFy
- if v.lower() not in PERMISSION_LIST:
- raise ValueError("Unknown permission: {}".format(v))
- mask |= getattr(Permissions, v.upper())
- return mask
- def permissions2str(mask):
- """Преобразует числовое представление прав доступа приложения в строку
- >>> permissions2str(65554)
- 'friends,video,offline'
- """
- flags = mask
- out = []
- # Элементы в списке отсортированы по алфавиту
- for v in PERMISSION_LIST:
- flag = getattr(Permissions, v.upper())
- if flag & flags == flag:
- # Сбрасываем флаг. Правильно так делать flags &= ~flag, но это в
- # случае, если неизвестно установлен флаг или нет
- # 110 ^ 001 = 111
- # 111 ^ 001 = 110
- # 110 & ~001 = 110
- # 111 & ~001 = 110
- flags ^= flag
- out.append(v)
- if flags != 0:
- raise ValueError("Bad mask: {}".format(mask))
- return ','.join(out)
- class ClientError(Exception):
- pass
- class ApiError(ClientError):
- """Подробнее про ошибки API можно прочитать по ссылке
- <https://vk.com/dev/errors>
- """
- UNKNOWN_ERROR_OCCURRED = 1
- UNKNOWN_METHOD_PASSED = 3
- INCORRECT_SIGNATURE = 4
- USER_AUTHORIZATION_FAILED = 5
- TOO_MANY_REQUESTS_PER_SECOND = 6
- INVALID_REQUEST = 8
- FLOOD_CONTROL = 9
- INTERNAL_SERVER_ERROR = 10
- CAPTCHA_NEEDED = 14
- ACCESS_DENIED = 15
- HTTP_AUTHORIZATION_FAILED = 16
- VALIDATION_REQUIRED = 17
- THIS_METHOD_WAS_DISABLED = 23
- CONFIRMATION_REQUIRED = 24
- INVALID_APPLICATION_API_ID = 101
- INVALID_USER_ID = 113
- INVALID_TIMESTAMP = 150
- ACCESS_TO_ALBUM_DENIED = 200
- ACCESS_TO_AUDIO_DENIED = 201
- ACCESS_TO_GROUP_DENIED = 203
- THIS_ALBUM_IS_FULL = 300
- SOME_ADS_ERROR_OCCURED = 603
- def __init__(self, data):
- self.msg = data.error_msg
- self.code = data.error_code
- self.captcha_img = data.get('captcha_img')
- self.captcha_sid = data.get('captcha_sid')
- self.redirect_uri = data.get('redirect_uri')
- self.params = params = {
- v['key']: v['value'] for v in data['request_params']}
- self.method = params.pop('method')
- self.oauth = params.pop('oauth')
- def __str__(self):
- s = ("{} (Error Code: {}). An error occured while calling method {!r} "
- "with parameters: {}")
- return s.format(self.msg, self.code, self.method, self.params)
- class AuthError(ClientError):
- NEED_CAPTCHA = 'need_captcha'
- NEED_VALIDATON = 'need_validation'
- def __init__(self, data):
- self.type = data.error
- # Ошибка капчи не возвращает описание
- self.description = data.get('error_description')
- self.captcha_img = data.get('captcha_img')
- self.captcha_sid = data.get('captcha_sid')
- self.redirect_uri = data.get('redirect_uri')
- def __str__(self):
- if not self.description:
- return self.type
- return "{}: {}".format(self.type, self.description)
- class UploadError(ClientError):
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement