Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # @author: noname <tz4678@gmail.com>
- # @description: модуль для работы с API Вконтакте
- # Полный список методов API можно увидеть здесь https://vk.com/dev/methods
- import sys
- import json
- import urllib2
- import urllib
- import uuid
- import os
- import mimetypes
- class VkNeedCaptcha(Exception):
- pass
- class VkAuthError(Exception):
- pass
- class VkAuthFailed(Exception):
- pass
- class VkApiError(Exception):
- pass
- class VkUploadError(Exception):
- pass
- class VkClient(object):
- def __init__(self, client_id, client_secret, permissions, api_version):
- self.client_id = client_id
- self.client_secret = client_secret
- self.permissions = permissions
- self.api_version = api_version
- self.user_agent = 'VkClient (Python %s.%s.%s)' % sys.version_info[:3]
- self.user_id = None
- self.access_token = None
- def fetch_json(self, url, data = None, headers = {}):
- url = str(url)
- headers.setdefault('User-Agent', self.user_agent)
- request = urllib2.Request(url, data, headers)
- response = urllib2.urlopen(request);
- content = response.read().decode('utf-8')
- # print content
- return json.loads(content)
- # переопределяем метод при необходимости
- def captcha_handler(self, url):
- raise VkNeedCaptcha('Captcha is needed')
- def captcha_params(self, params, response):
- params.update({
- 'captcha_key': self.captcha_handler(response['captcha_img']),
- 'captcha_sid': response['captcha_sid']
- })
- def _auth(self, params):
- defaults = {
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- 'scope': self.permissions,
- 'v': self.api_version,
- 'grant_type': 'password'
- }
- defaults.update(params)
- params = defaults
- result = self.fetch_json('https://oauth.vk.com/token?' + urllib.urlencode(params))
- if 'error' in result:
- if result['error'] == 'need_captcha':
- self.captcha_params(params, result)
- return self._auth(params)
- raise VkAuthError('%s. Error Description: %s' % (result['error'], result.get('error_description', '')))
- self.user_id = result['user_id']
- self.access_token = result['access_token']
- def auth(self, login, password):
- self._auth({'username': login.encode('utf8'), 'password': password.encode('utf8')})
- def raw_api(self, method, params = {}):
- # print params
- headers = {}
- if len(params):
- for k, v in params.items():
- if type(v) == unicode:
- params[k] = v.encode('utf-8')
- headers['Content-Type'] = 'application/x-www-form-urlencoded'
- data = urllib.urlencode(params) if len(params) else None
- # print data
- result = self.fetch_json('https://api.vk.com/method/' + method, data, headers)
- if 'error' in result:
- result = result['error']
- if result['error_code'] == 14:
- self.captcha_params(params, result)
- return self.raw_api(method, params)
- elif result['error_code'] == 5:
- raise VkAuthFailed(result['error_msg'])
- raise VkApiError(result['error_msg'])
- return result['response']
- def api(self, method, params = {}):
- params.setdefault('access_token', self.access_token)
- params.setdefault('v', self.api_version)
- return self.raw_api(method, params)
- def escape_quote(self, s):
- return s.replace('"', '\\"')
- def upload(self, upload_url, files):
- boundary = '--VkClientBoundary_' + uuid.uuid4().hex
- encoding = sys.getfilesystemencoding()
- L = []
- for fieldname, filename in files.items():
- extension = os.path.splitext(filename)[1]
- content_type = mimetypes.types_map[extension] if extension in mimetypes.types_map else 'application/octet-stream'
- basename = os.path.basename(filename).encode(encoding)
- L.extend([
- '--' + boundary,
- 'Content-Disposition: form-data; name="%s"; filename="%s"' % (self.escape_quote(fieldname), basename),
- 'Content-Type: ' + content_type,
- '',
- open(filename.encode(encoding), 'rb').read()
- ])
- L.extend([
- '--' + boundary + '--',
- ''
- ])
- headers = {
- 'Content-Type': 'multipart/form-data; boundary="%s"' % boundary,
- }
- data = '\r\n'.join(L)
- result = self.fetch_json(upload_url, data, headers)
- if 'error' in result:
- raise VkUploadError(result['error'])
- return result
- def filesdict(self, L):
- out = {}
- i = len(L)
- while i:
- out['file%s' % i] = L[i - 1]
- i -= 1
- return out
- def upload_photos(self, images, album_id, group_id = None):
- params = {'album_id': album_id}
- if group_id is not None:
- params['group_id'] = group_id
- server = self.api('photos.getUploadServer', params)
- i = 0
- total = len(images)
- while i < total:
- # только пять изображений можно загрузить за раз?
- limit = i + 5
- files = self.filesdict(images[i:limit])
- r = self.upload(server['upload_url'], files)
- # не все изменения в API документируются
- if not 'album_id' in r:
- r['album_id'] = r['aid']
- del r['aid']
- self.api('photos.save', r)
- i = limit
- # еще методы для загрузки
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement