Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import os
- import re
- import sys
- import json
- import time
- import uuid
- import urllib
- import urllib2
- import mimetypes
- class VKError(Exception):
- pass
- class VKValidationError(VKError):
- pass
- class VKCaptchaError(VKError):
- pass
- class VKAuthError(VKError):
- def __init__(self, t, d):
- super(VKAuthError, self).__init__("%s: %s" % (t, d))
- self.type=t
- class VKApiError(VKError):
- def __init__(self, c, d):
- super(VKApiError, self).__init__("[%s] %s" % (c, d))
- self.code=c
- class VKAgent(object):
- def __init__(self, access_token=None, user_id=None,
- expires=None, api_version=5.25, lang=None, user_agent="VKAgent"):
- self.access_token=access_token
- self.user_id=user_id
- self.expires=expires
- self.api_version=api_version
- self.lang=lang
- self.user_agent=user_agent
- def login(self, username, password, client_id=3140623,
- client_secret='VeWdmVclDCtn6ihuP1nt', scope=''):
- u"Прямая авторизация."
- params={
- 'username': username,
- 'password': password,
- 'client_id': client_id,
- 'client_secret': client_secret,
- 'scope': scope,
- 'grant_type': 'password',
- 'test_redirect_uri': 1,
- }
- self.auth('token', params)
- def api(self, method, params):
- u"Вызывает метод API Вконтакте."
- params=dict(params)
- if not 'access_token' in params and self.access_token:
- params['access_token']=self.access_token
- if not 'lang' in params and self.lang:
- params['lang']=self.lang
- if not 'v' in params:
- params['v']=self.api_version
- return self.raw_api(method, params)
- def auth(self, action, params):
- url='https://oauth.vk.com/' + action
- r=self.post(url, params)
- if 'error' in r:
- if 'need_validation' == r['error']:
- self.validate(r['redirect_uri'])
- elif 'need_captcha' == r['error']:
- self.captcha_params(params, r['captcha_img'], r['captcha_sid'])
- else:
- raise VKAuthError(r['error'], r['error_description'])
- return self.auth(action, params)
- self.access_token = r['access_token']
- self.user_id = r.get('user_id')
- if r['expires_in']:
- self.expires = r['expires_in'] + time.time()
- def raw_api(self, method, params):
- url='https://api.vk.com/method/' + method
- r=self.post(url, params)
- if 'response' in r:
- return r['response']
- error=r['error']
- if error['error_code'] == 14:
- self.captcha_params(params, error['captcha_img'], error['captcha_sid'])
- elif error['error_code'] == 17:
- self.validate(error['redirect_uri'])
- else:
- raise VKApiError(error['error_code'], error['error_msg'])
- self.raw_api(method, params)
- def post(self, url, params):
- params=self.encode_params(params)
- headers={'Content-Type': 'application/x-www-form-urlencoded'}
- return self.get_json(url, params, headers)
- def upload(self, url, files):
- u"Загружает файлы."
- lines=[]
- boundary=uuid.uuid4().hex
- content_type='application/octet-stream'
- for k, v in files.items():
- if re.match('(?i)https?://', v, re.I):
- r=self.fetch(v)
- content_type=r.headers.get('content-type', content_type)
- data=r.read()
- else:
- f=open(v, 'rb')
- data=f.read()
- f.close()
- ending=os.path.splitext(v)[1]
- content_type=mimetypes.types_map.get(ending, content_type)
- v=v.encode(sys.getfilesystemencoding())
- lines.extend([
- '--' + boundary,
- 'Content-Disposition: form-data; name="' + k +\
- '"; filename="' + os.path.basename(v) + '"',
- 'Content-Type: ' + content_type,
- '',
- data
- ])
- lines.extend([
- '--' + boundary + '--',
- ''
- ])
- data='\r\n'.join(lines)
- headers={'Content-Type': 'multipart/form-data; boundary=' + boundary}
- return self.exec_url(url, data, headers)
- def fetch(self, url, data=None, headers={}):
- url=str(url) # bugfix
- _headers={'User-Agent': self.user_agent}
- _headers.update(headers)
- r=urllib2.Request(url, data, _headers)
- try:
- r=urllib2.urlopen(r)
- except urllib2.HTTPError as r:
- pass
- return r
- def get_content(self, url, headers={}):
- return self.fetch(url, None, headers).read()
- def get_json(self, url, data=None, headers={}):
- j=self.fetch(url, data, headers).read()
- try:
- j=json.loads(j)
- except:
- # некоторые методы отдают данные в windows-1251 вместо utf-8
- j=json.loads(j, 'windows-1251')
- return j
- # !Неудачное имя метода.
- def exec_url(self, url, data=None, headers={}):
- u"""
- Возвращает response, либо бросает исключение, если в ответе есть
- поле error.
- """
- r = self.get_json(url, data, headers)
- if 'error' in r:
- raise VKError(r['error'])
- return r
- def encode_params(self, params):
- params=dict(params)
- for k, v in params.items():
- if type(v) == unicode:
- params[k]=v.encode('utf-8')
- return urllib.urlencode(params)
- def get_captcha_key(self, url):
- u"Возвращает значение капчи."
- print url
- raise VKCaptchaError('Captcha needed.')
- def captcha_params(self, params, url, sid):
- u"Добавляет парамерты капчи."
- params['captcha_sid']=sid
- params['captcha_key']=self.get_captcha_key(url)
- def validate(self, url):
- u"""
- Осуществляет автоматическую валидацию пользователя при заходе из
- подозрительного места.
- """
- s=self.get_content(url)
- m=re.search(r'/security_check\?[^"]+', s)
- if m:
- u='https://oauth.vk.com' + m.group(0)
- r=self.fetch(u)
- if r.geturl() == 'https://oauth.vk.com/blank.html?success=1':
- return
- raise VKValidationError('Validation failed.')
- ###############################################################################
- if __name__ == '__main__':
- from account import *
- vk=VKAgent()
- vk.login(username, password)
- api=vk.api
- sl=time.sleep
- # 35 запросов в секунду ~0.03
- for i in range(20):
- api('wall.post', dict(message="Тест %s" % (i + 1)))
- sl(0.1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement