Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: u8 -*-
- ################################################################################
- # #
- # @link http://vk.com/ Персональная страница автора Вконтакте. #
- # #
- # @link https://github.com/err0x0/ Мои репозитории. #
- # #
- # Copyright (c) 2014 Сергей Снегирев <tz4678@gmail.com> #
- # #
- # Данная лицензия разрешает лицам, получившим копию данного программного #
- # обеспечения и сопутствующей документации (в дальнейшем именуемыми #
- # «Программное Обеспечение»), безвозмездно использовать Программное #
- # Обеспечение без ограничений, включая неограниченное право на использование, #
- # копирование, изменение, добавление, публикацию, распространение, #
- # сублицензирование и/или продажу копий Программного Обеспечения, также как и #
- # лицам, которым предоставляется данное Программное Обеспечение, при #
- # соблюдении следующих условий: #
- # #
- # Указанное выше уведомление об авторском праве и данные условия должны быть #
- # включены во все копии или значимые части данного Программного Обеспечения. #
- # #
- # ДАННОЕ ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЯЕТСЯ «КАК ЕСТЬ», БЕЗ КАКИХ-ЛИБО #
- # ГАРАНТИЙ, ЯВНО ВЫРАЖЕННЫХ ИЛИ ПОДРАЗУМЕВАЕМЫХ, ВКЛЮЧАЯ, НО НЕ ОГРАНИЧИВАЯСЬ #
- # ГАРАНТИЯМИ ТОВАРНОЙ ПРИГОДНОСТИ, СООТВЕТСТВИЯ ПО ЕГО КОНКРЕТНОМУ НАЗНАЧЕНИЮ #
- # И ОТСУТСТВИЯ НАРУШЕНИЙ ПРАВ. НИ В КАКОМ СЛУЧАЕ АВТОРЫ ИЛИ ПРАВООБЛАДАТЕЛИ НЕ #
- # НЕСУТ ОТВЕТСТВЕННОСТИ ПО ИСКАМ О ВОЗМЕЩЕНИИ УЩЕРБА, УБЫТКОВ ИЛИ ДРУГИХ #
- # ТРЕБОВАНИЙ ПО ДЕЙСТВУЮЩИМ КОНТРАКТАМ, ДЕЛИКТАМ ИЛИ ИНОМУ, ВОЗНИКШИМ ИЗ, #
- # ИМЕЮЩИМ ПРИЧИНОЙ ИЛИ СВЯЗАННЫМ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ ИЛИ ИСПОЛЬЗОВАНИЕМ #
- # ПРОГРАММНОГО ОБЕСПЕЧЕНИЯ ИЛИ ИНЫМИ ДЕЙСТВИЯМИ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ. #
- # #
- ################################################################################
- __author__ = u'<tz4678@gmail.com>'
- import sys, os, time, urllib, urllib2, httplib, re, json, uuid, mimetypes,\
- keyword, captcha
- UNDERSCORE_KEYWORD_PATTERN = '_(' + '|'.join(keyword.kwlist) + ')$'
- class VKError(Exception):
- pass
- class VKValidationError(VKError):
- pass
- class VKCaptchaError(VKError):
- pass
- class VKAuthorizationError(VKError):
- def __init__(self, error_type, description):
- super(VKAuthorizationError, self).\
- __init__('%s - %s' % (error_type, description))
- self.type = error_type
- class VKApiError(VKError):
- def __init__(self, code, message):
- super(VKApiError, self).__init__('[%s] %s' % (code, message))
- self.code = code
- class VKClient(object):
- def __init__(self, access_token=None, user_id=None,
- expires=None, delay_time=0, encoding=None, api_version=5.23,
- lang=None, user_agent='VKClient'):
- self.access_token = access_token
- self.user_id = user_id
- self.expires = expires
- self.delay_time = delay_time
- self.encoding = encoding
- self.api_version = api_version
- self.lang = lang
- self.user_agent = user_agent
- def api(self, _method, **params):
- u"""
- Вызывает метод API и в случае удачи возвращает response. Если имя
- параметра совпадает с ключевым словом, то к имени нужно добавить
- подчеркивание, например: _from, _global.
- """
- if self.access_token:
- params['access_token'] = self.access_token
- if self.lang:
- params['lang'] = self.lang
- params['v'] = self.api_version
- u = 'https://api.vk.com/method/' + _method
- for k, v in params.items():
- m = re.match(UNDERSCORE_KEYWORD_PATTERN, k)
- if m:
- # _from -> from
- params[m.group(1)] = params.pop(k)
- # print params
- h = {'Content-Type': 'application/x-www-form-urlencoded'}
- while 1:
- d = self.build_query(params)
- r = self.get_json(u, d, h)
- if 'response' in r:
- return r['response']
- r = r['error']
- if r['error_code'] == 14:
- self.captcha_params(params, r['captcha_img'], r['captcha_sid'])
- elif r['error_code'] == 17:
- self.validate(r['redirect_uri'])
- else:
- raise VKApiError(r['error_code'], r['error_msg'])
- # прямая авторизация
- def login(self, username, password, client_id=3140623,
- client_secret='VeWdmVclDCtn6ihuP1nt', scope=None):
- self.oauth('token', {
- 'username': username,
- 'password': password,
- 'client_id': client_id,
- 'client_secret': client_secret,
- 'scope': scope,
- 'grant_type': 'password',
- })
- def oauth(self, action, params):
- while 1:
- u = 'https://oauth.vk.com/' + action + '?' + self.build_query(params)
- r = self.get_json(u)
- if not 'error' in r:
- break
- if 'need_validation' == r['error']:
- self.validate(r['redirect_uri'])
- elif 'need_captcha' == r['error']:
- self.captcha_params(params, r['captcha_img'], r['captcha_sid'])
- else:
- raise VKAuthorizationError(r['error'], r['error_description'])
- self.access_token = r['access_token']
- self.user_id = r.get('user_id')
- if r['expires_in']:
- self.expires = r['expires_in'] + time.time()
- def get(self, url, data=None, headers={}):
- u"""
- Возвращает response, либо бросает исключение, если в ответе есть
- поле error.
- """
- r = self.get_json(url, data, headers)
- if 'error' in r:
- raise VKError(r['error'])
- return r
- def get_json(self, url, data=None, headers={}):
- headers = dict(headers)
- headers['User-Agent'] = self.user_agent
- if self.delay_time:
- self.delay(self.delay_time)
- js = self.get_page(url, data, headers)
- try:
- js = json.loads(js)
- except:
- # некоторые методы API отдают данные в windows-1251, вместо UTF-8
- js = json.loads(js, '1251')
- return js
- def get_page(self, url, data=None, headers={}):
- url = str(url)
- r = urllib2.Request(url, data, headers)
- try:
- u = urllib2.urlopen(r)
- except urllib2.HTTPError as u:
- pass
- s = u.read()
- return s
- def delay(self, sec):
- time.sleep(sec)
- def validate(self, url):
- u"""
- Осуществляет автоматическую валидацию пользователя при заходе из
- подозрительного места.
- """
- s = self.get_page(url)
- m = re.search(r'/security_check\?[^"]+', s)
- if m:
- u = m.group(0)
- c = httplib.HTTPConnection('oauth.vk.com')
- c.request('GET', u)
- r = c.getresponse()
- h = dict( r.getheaders() )
- if h['location'] == '/blank.html?success=1':
- return
- raise VKValidationError('Validation failed.')
- def upload(self, url, files):
- u"""Загружает файлы."""
- lines = []
- boundary = uuid.uuid4().hex
- content_type = 'application/unknown'
- for k, v in files.items():
- if re.match('(?i)https?://', v, re.I):
- v = str(v)
- u = urllib.urlopen(v)
- content_type = u.info().get('content-type', content_type)
- data = u.read()
- else:
- f = open(v, 'rb')
- data = f.read()
- f.close()
- ending = os.path.splitext(v)[1]
- content_type = mimetypes.types_map.get(ending, content_type)
- v = v.encode( sys.getfilesystemencoding() )
- lines.extend([
- '--' + boundary,
- 'Content-Disposition: form-data; name="' + k +\
- '"; filename="' + os.path.basename(v) + '"',
- 'Content-Type: ' + content_type,
- '',
- data
- ])
- lines.extend([
- '--' + boundary + '--',
- ''
- ])
- return self.get(url, '\r\n'.join(lines),
- {'Content-Type': 'multipart/form-data; boundary=' + boundary})
- def upload_photo(self, url, filename):
- u"""Загружает фото."""
- return self.upload(url, {'photo': filename})
- def upload_file(self, url, filename):
- return self.upload(url, {'file': filename})
- def upload_files(self, url, file_list):
- u"""Загружает список файлов."""
- files = {}
- for i in range( len(file_list) ):
- files['file%i' % (i + 1)] = file_list[i]
- return self.upload(url, files)
- def upload_photos(self, photo_list, album_id, group_id=None):
- u"""Грузим фотографии в альбом."""
- u = self.api('photos.getUploadServer', album_id=album_id,
- group_id=group_id)['upload_url']
- ret = []
- for i in range(0, len(photo_list), 5):
- L = photo_list[i : i + 5]
- r = self.upload_files(u, L)
- r['album_id'] = r.pop('aid')
- ret += self.api('photos.save', **r)
- return ret
- # 0 - себе на стену
- def upload_wall_photo(self, filename, owner_id=0):
- u"""Грузим фото на стену."""
- gid = abs(owner_id) if owner_id < 1 else None
- u = self.api('photos.getWallUploadServer', group_id=gid)\
- ['upload_url']
- r = self.upload_photo(u, filename)
- if gid:
- r['group_id'] = gid
- elif owner_id:
- r['user_id'] = owner_id
- return self.api('photos.saveWallPhoto', **r)
- def upload_owner_photo(self, filename, group_id=0):
- u"""Грузим фото на аву."""
- owner_id = None
- if group_id:
- owner_id = -group_id
- u = self.api('photos.getOwnerPhotoUplo adServer', owner_id=owner_id)\
- ['upload_url']
- r = self.upload_photo(u, filename)
- return self.api('photos.saveOwnerPhoto', **r)
- def upload_message_photo(self, filename):
- u"""Грузим фото в личное сообщение."""
- u = self.api('photos.getMessagesUploadServer')['upload_url']
- r = self.upload_photo(u, filename)
- return self.api('photos.saveMessagesPhoto', **r)
- def upload_audio(self, filename, title=None, artist=None):
- u"""Грузим аудиозапись."""
- u = self.api('audio.getUploadServer')['upload_url']
- r = self.upload_file(u, filename)
- r['title'] = title
- r['artist'] = artist
- return self.api('audio.save', **r)
- def upload_doc(self, filename, group_id=None):
- u"""Грузим документ в 'Документы'."""
- u = self.api('docs.getUploadServer', group_id=group_id)\
- ['upload_url']
- r = self.upload_file(u, filename)
- return self.api('docs.save', **r)
- def upload_wall_doc(self, filename, group_id=None):
- u"""Грузим документ на стену."""
- u = self.api('docs.getWallUploadServer', group_id=group_id)\
- ['upload_url']
- r = self.upload_file(u, filename)
- return self.api('docs.save', **r)
- def upload_video(self, filename, name=None, description=None,
- is_private=None, wallpost=None, group_id=None, album_id=None,
- repeat=None):
- u"""Загружает и сохраняет видеофайл."""
- ret = {}
- res = self.api('video.save', name=name, description=description,
- is_private=is_private, wallpost=wallpost, group_id=group_id,
- album_id=album_id, repeat=repeat)
- ret['video_id'] = res['video_id']
- ret['owner_id'] = res['owner_id']
- # содержит единственное поле 'size'
- ret['size'] = self.upload(res['upload_url'], {'video_file': filename})\
- ['size']
- return ret
- def import_video(self, url, name=None, description=None, is_private=None,
- wallpost=None, group_id=None, album_id=None, repeat=None):
- u"""Импортуриует видео с популярных видеохостингов типа YouTube."""
- u = self.api('video.save', name=name, description=description,
- is_private=is_private, wallpost=wallpost, link=url,
- group_id=group_id, album_id=album_id, repeat=repeat)['upload_url']
- return self.get(u)
- def build_attachments(self, prefix, obj, key='id'):
- sf = prefix + '{owner_id}_{' + key + '}'
- return ( ','.join([sf.format(**x) for x in obj]) if type(obj) == list else\
- sf.format(**obj) ) + ','
- def build_query(self, params):
- d = {}
- for k, v in params.items():
- if v == None:
- continue
- if type(v) == str:
- if self.encoding:
- v = v.decode(self.encoding)
- if type(v) == unicode:
- v = v.encode('u8')
- d[k] = v
- return urllib.urlencode(d)
- def get_captcha_key(self, url):
- u"""Возвращает значение капчи."""
- return captcha.render_captcha( url, os.path.basename(sys.argv[0]) )
- raise VKCaptchaError('Captcha needed.')
- def captcha_params(self, params, url, sid):
- u"""Добавляет парамерты капчи."""
- params['captcha_sid'] = sid
- params['captcha_key'] = self.get_captcha_key(url)
Add Comment
Please, Sign In to add comment