Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import json
- import const
- from array import *
- import base64
- import hmac,hashlib
- import pprint
- import time
- import urllib.parse
- class ICQClient(object):
- CLIENT_LOGIN_URL = 'https://api.login.icq.net/auth/clientLogin'
- CLIENT_MESSAGE_URL = 'http://api.icq.net/im/sendIM'
- CLIENT_BASE = 'https://rapi.icq.net'
- CLIENT_API_BASE = 'http://api.icq.net'
- CONNECTION_TIMEOUT = 60 * 1000
- DEV_ID = 'ic1Ytjc4pxslFTEL'
- CLIENT_NAME = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
- CLIENT_VERSION = '1.0'
- sessionKey = ''
- def __init__(self, login, password):
- self.login = login
- self.password = password
- def _session_settings(self):
- STARTS_SESSION = {
- 'f': 'json',
- 'language': 'ru',
- 'events': 'mchat,userAddedToBuddyList,hist,imState,buddylist',
- 'includePresenceFields': 'aimId,friendly,state,ssl',
- 'sessionTimeout': '2592000',
- 'ts': round(time.time())
- }
- return STARTS_SESSION
- def _make_login_params(self):
- login_params = {
- 's': self.login,
- 'pwd': self.password,
- 'language': 'ru',
- 'time': time.time()+1000,
- 'idType': 'ICQ',
- 'tokenType':'longterm',
- 'k': 'ic17mFHiwr52TKrx',
- 'devId': '***',
- 'f':'json'
- }
- return login_params
- def do_login(self):
- try:
- print('Login')
- response = requests.post(self.CLIENT_LOGIN_URL, data=self._make_login_params(),timeout=self.CONNECTION_TIMEOUT)
- print(response.text)
- if response.status_code == 200:
- res_content = json.loads(response.text)
- session_params = self._session_settings()
- session_params['k'] = 'ic17mFHiwr52TKrx'
- session_params['a'] = res_content['response']['data']['token']['a']
- session_params['view'] = 'online'
- session_params['invisible'] = 'false'
- session_params['mobile'] = 0
- key = bytes(self.password, 'UTF-8')
- message = bytes(res_content['response']['data']['sessionSecret'], 'UTF-8')
- digester = hmac.new(key, message, hashlib.sha256)
- signature1 = digester.digest()
- signature2 = base64.urlsafe_b64encode(signature1)
- self.sessionKey = str(signature2, 'UTF-8')
- url = self.makeSignedUrl(self.CLIENT_API_BASE + '/aim/startSession', session_params, self.sessionKey, 'GET')
- response_session = requests.get(url)
- print(response_session.text)
- #return res_content.get('statusCode', 0), res_content.get('data', {})
- else:
- print('server return ' + response.status_code)
- return const.INTERNAL_ERROR_CODE, {}
- except requests.exceptions.Timeout:
- print('Timeout reached while connecting to server')
- #logging.info(traceback.format_exc())
- return const.INTERNAL_ERROR_CODE, {}
- def startSession(self):
- response = requests.get(self.CLIENT_API_BASE + '/aim/startSession', params=self._session_settings(), timeout=self.CONNECTION_TIMEOUT)
- pprint.pprint(response)
- def makeSignedUrl(self, url, params, sessionKey, method):
- sorted(params)
- params = urllib.parse.urlencode(params)
- i = method + '&' + urllib.parse.unquote(url) + '&' + urllib.parse.unquote(params)
- key = bytes(self.sessionKey, 'UTF-8')
- message = bytes(i, 'UTF-8')
- digester = hmac.new(key, message, hashlib.sha256)
- signature1 = digester.digest()
- signature2 = base64.urlsafe_b64encode(signature1)
- params += '&sig_sha256=' + str(signature2, 'UTF-8')
- print(url + '?' + params)
- return url + '?' + params;
- def do_send(self, uin, message):
- try:
- print('Send msg')
- payload = {
- t: uin,
- message: message,
- notifyDelivery: 0
- }
- response = requests.post(self.CLIENT_BASE, data=payload, timeout=self.CONNECTION_TIMEOUT)
- print(response)
- except:
- print('ex')
Add Comment
Please, Sign In to add comment