Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # @author: noname <tz4678@gmail.com>
- # @description: модуль для работы с API Вконтакте
- # Полный список методов API можно увидеть здесь https://vk.com/dev/methods
- import sys
- import urllib
- import json
- import urllib2
- import uuid
- import os
- import mimetypes
- class NeedCaptchaError(Exception):
- def __init__(self, captcha_img, captcha_sid):
- super(NeedCaptchaError, self).__init__('need captcha')
- self.captcha_img = captcha_img
- self.captcha_sid = captcha_sid
- class VkAuthError(Exception):
- pass
- class VkApiError(Exception):
- pass
- class VkUploadError(Exception):
- pass
- class VkClient(object):
- def __init__(self, client_id, client_secret, permissions, api_version):
- self.client_id = client_id
- self.client_secret = client_secret
- self.permissions = permissions
- self.api_version = api_version
- self.user_agent = 'VkClient (Python %s.%s.%s)' % sys.version_info[:3]
- self.user_id = None
- self.access_token = None
- def fetch_json(self, url, data = None, headers = {}):
- headers.setdefault('User-Agent', self.user_agent)
- request = urllib2.Request(url, data, headers)
- response = urllib2.urlopen(request);
- content = response.read().decode('utf-8')
- return json.loads(content)
- def auth(self, username, password, **params):
- defaults = {
- 'username': username,
- 'password': password,
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- 'scope': self.permissions,
- 'v': self.api_version,
- 'grant_type': 'password'
- }
- params.update(defaults)
- result = self.fetch_json('https://oauth.vk.com/token?' + urllib.urlencode(params))
- if 'error' in result:
- if result['error'] == 'need_captcha':
- # raise NeedCaptchaError(result['captcha_img'], result['captcha_sid'])
- raise VkAuthError('%s. Error Description: %s' % (result['error'], result.get('error_description', '')))
- self.user_id = result['user_id']
- self.access_token = result['access_token']
- def raw_api(self, uri, params = {}):
- headers = {}
- if len(params):
- for k, v in params.items():
- if type(v) == unicode:
- params[k] = v.encode('utf-8')
- headers['Content-Type'] = 'application/x-www-form-urlencoded'
- data = urllib.urlencode(params) if len(params) else None
- result = self.load_json('https://api.vk.com/method/' + uri, data, headers)
- if 'error' in result:
- error = result['error']
- if error['error_code'] == 14:
- # raise NeedCaptchaError(error['captcha_img'], error['captcha_sid'])
- raise VkApiError(error['error_msg'])
- return result['response']
- def api(self, uri, params = {}):
- params.update({
- 'access_token': self.access_token,
- 'v': self.api_version
- })
- return self.raw_api(uri, params)
- def escape_quote(self, s):
- return s.replace('"', '\\"')
- # VkClient().upload(server_url, {'photo': filename})
- def upload(self, server_url, files):
- boundary = uuid.uuid4().hex
- L = []
- for fieldname, filename in files.items():
- extension = os.path.splitext(filename)[1]
- L.extend([
- '--' + boundary,
- 'Content-Disposition: form-data; name="%s"; filename="%s"' % (self.escape_quote(fieldname), os.path.basename(filename)),
- 'Content-Type: ' + (mimetypes.types_map[extension] if extension in mimetypes.types_map else 'application/octet-stream'),
- '',
- open(filename, 'rb').read()
- ])
- L.extend([
- '--' + boundary + '--',
- ''
- ])
- headers = {
- 'Content-Type': 'multipart/form-data; boundary="%s"' % boundary,
- }
- result = self.fetch_json(server_url, '\r\n'.join(L), headers)
- if 'error' in result:
- raise VkUploadError(result['error'])
- return result
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement