Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: u8 -*-
- import sys
- import urllib
- import time
- import utils
- import json
- import re
- import httplib
- import keyword
- MATCH_PARAM = '_(' + '|'.join(keyword.kwlist) + ')$'
- class VKError(Exception):
- pass
- class VKValidationError(VKError):
- pass
- class VKCaptchaError(VKError):
- pass
- class VKAuthorizationError(VKError):
- def __init__(self, resp):
- super(VKAuthorizationError, self).__init__('{error} - {error_description}'.format(**resp))
- self.error = resp['error']
- class VKApiCallError(VKError):
- def __init__(self, error):
- super(VKApiCallError, self).__init__('[{error_code}] {error_msg}'.format(**error))
- self.code = error['error_code']
- class VKRobot(object):
- def __init__(self, username=None, password=None, delay_time=0, api_version=5.23, user_agent='VKRobot'):
- self.username = username
- self.password = password
- self.delay_time = delay_time
- self.user_agent = user_agent
- self.api_version = api_version
- # пробуем авторизоваться
- p = {
- 'username': username,
- 'password': password,
- 'client_id': 3140623,
- 'client_secret': 'VeWdmVclDCtn6ihuP1nt',
- 'grant_type': 'password',
- 'test_redirect_uri': 1
- }
- while 1:
- u = 'https://oauth.vk.com/token?' + self._encodeParams(p)
- r = self.getJSON(u)
- if not 'error' in r:
- break
- if 'need_validation' == r['error']:
- self.validate(r)
- elif 'need_captcha' == r['error']:
- self._captchaParams(p, r)
- else:
- raise VKAuthorizationError(r)
- self.access_token = r['access_token']
- self.user_id = r['user_id']
- def call(self, _method, **params):
- u"""
- Вызывает метод API и в случае удачи возвращает response. Если имя
- параметра совпадает с ключевым словом, то к имени нужно добавить
- подчеркивание, например: _from, _global.
- """
- params['access_token'] = self.access_token
- params.setdefault('v', self.api_version)
- u = 'https://api.vk.com/method/' + _method
- for k, v in params.items():
- m = re.match(MATCH_PARAM, k)
- if m:
- params[m.group(1)] = params.pop(k)
- h = {'Content-Type': 'application/x-www-form-urlencoded'}
- while 1:
- d = self._encodeParams(params)
- print u
- print params
- r = self.getJSON(u, d, h)
- if 'response' in r:
- return r['response']
- r = r['error']
- if r['error_code'] == 14:
- self._captchaParams(params, r)
- elif r['error_code'] == 17:
- self.validate(r)
- else:
- raise VKApiCallError(r)
- def get(self, url, data=None, headers={}):
- u"""
- Возвращает response, либо бросает исключение, если в ответе есть
- поле error.
- """
- r = self.getJSON(url, data, headers)
- if 'error' in r:
- raise VKError(r['error'])
- return r
- def upload(self, url, files):
- u"""
- Загружает файлы. files - словарь, ключами которого являются имена
- полей, а значениями пути до файлов.
- """
- L = []
- b = uuid.uuid4().hex
- t = 'application/unknown'
- for n, p in files.items():
- if re.match('(?i)https?://', p, re.I):
- p = str(p)
- u = urllib.urlopen(p)
- t = u.info().get('content-type', t)
- d = u.read()
- else:
- f = open(src, 'rb')
- d = f.read()
- f.close()
- e = os.path.splitext(src)[1]
- t = mimetypes.types_map.get(e, t)
- p = p.encode(sys.getfilesystemencoding())
- L.extend([
- '--' + b,
- 'Content-Disposition: form-data; name="' + n + '"; filename="'\
- + os.path.basename(p) + '"',
- 'Content-Type: ' + t,
- '',
- d
- ])
- L.extend([
- '--' + b + '--',
- ''
- ])
- h = {'Content-Type': 'multipart/form-data; boundary=' + b}
- return self.get(url, '\r\n'.join(L), h)
- def uploadPhoto(self, u, p):
- u"""Загружает фото."""
- return self.upload(u, {'photo': p})
- def uploadFile(self, u, p):
- return self.upload(u, {'file': p})
- def uploadFiles(self, u, L):
- u"""Загружает список файлов."""
- a = {}
- for i in range(len(L)):
- a['file%i' % (i + 1)] = L[i]
- return self.upload(u, a)
- def upload_video(self, url, src):
- u"""Загружает видео."""
- return self.upload(url, {'video_file': src})
- def getJSON(self, url, data=None, headers={}):
- u"""Возвращает результат парсинга JSON."""
- headers['User-Agent'] = self.user_agent
- if self.delay_time:
- time.sleep(self.delay_time)
- j = utils.get_page(url, data, headers)
- try:
- j = json.loads(j)
- except:
- # некоторые методы API отдают данные в windows-1251, вместо UTF-8
- j = json.loads(j, '1251')
- return j
- def validate(self, resp):
- u"""
- Осуществляет автоматическую валидацию пользователя при заходе из
- подозрительного места.
- """
- s = utils.get_page(resp['redirect_uri'])
- m = re.search(r'/security_check\?[^"]+', s)
- if m:
- u = m.group(0)
- c = httplib.HTTPConnection('oauth.vk.com')
- c.request('GET', u)
- r = c.getresponse()
- h = dict(r.getheaders())
- if h['location'] == '/blank.html?success=1':
- print u'Валидация пройдена.'
- return
- raise VKValidationError('Validation failed.')
- def getCaptchaKey(self, url):
- u"""Возвращает значение капчи."""
- raise VKCaptchaError('Captcha needed.')
- def _captchaParams(self, params, resp):
- u"""Добавляет парамерты капчи."""
- params['captcha_sid'] = resp['captcha_sid']
- params['captcha_key'] = self.getCaptchaKey(resp['captcha_img'])
- def _encodeParams(self, params):
- u"""Формирует строку параметров."""
- for k, v in params.items():
- if type(v) == None:
- del params[k]
- continue
- if type(v) == str:
- v = v.decode(sys.stdin.encoding)
- if type(v) == unicode:
- v = v.encode('u8')
- params[k] = v
- return urllib.urlencode(params)
- #################### GENEREATED CODE GOES AFTER THIS LINE ####################
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement