Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Requires Python 3
- # abcdefghijklmnopqrstuvwxyz
- import gzip
- from http import cookiejar
- import io
- import json
- import rand
- import string
- import urllib.parse as uparse
- import urllib.request as urequest
- import zlib
- TIMEOUT = 15
- USER_AGENT = "Mozilla 5.0"
- class HTTPErrorHandler(urequest.HTTPDefaultErrorHandler):
- def http_error_default(self, req, fp, code, msg, hdrs):
- return fp
- class Response(object):
- def __init__(self, response):
- self.raw = response
- self.url = response.url
- self.status = response.status
- self.reason = response.reason
- self.version = response.version
- self.headers = response.headers
- self.encoding = response.headers.get_content_charset()
- self._body = None
- self._text = None
- def read(self, amt=None):
- return self.raw.read(amt)
- def close(self):
- self.raw.close()
- @property
- def success(self):
- return 300 > self.status >= 200
- @property
- def fail(self):
- return not self.success
- @property
- def body(self):
- if self._body is None:
- data = self.read()
- encoding = self.headers.get('content-encoding')
- if encoding == 'gzip':
- data = gzip.decompress(data)
- elif encoding == 'deflate':
- try:
- data = zlib.decompress(data, -zlib.MAX_WBITS)
- except zlib.error:
- data = zlib.decompress(data)
- self._body = data
- return self._body
- @property
- def text(self):
- if self._text is None:
- self._text = str(self.body, self.encoding or 'utf-8', 'replace')
- return self._text
- def json(self, **kw):
- return json.loads(self.text, **kw)
- class MultipartData(object):
- BOUNDARY_CHARS = string.ascii_letters + string.digits
- def __init__(self, encoding='utf-8'):
- self._encoding = encoding
- self._body = b''
- self._boundary = ''.join([
- random.choice(self.BOUNDARY_CHARS) for _ in range(30)])
- @property
- def boundary(self):
- return self._boundary
- @property
- def contentType(self):
- return 'multipart/form-data; boundary={0}'.format(self._boundary)
- def append(self, name, value, filename=None, contentType=None):
- self._body += bytes('--{0}\r\n'.format(self._boundary), 'utf-8')
- self._body += b'Content-Disposition: form-data; name="' + \
- bytes(name, self._encoding) + b'"'
- if filename:
- self._body += b'; filename="' + bytes(filename, self._encoding) + \
- b'"'
- self._body += b'\r\n'
- if contentType:
- self._body += \
- bytes('Content-Type: {0}\r\n'.format(contentType), 'utf-8')
- if isinstance(value, str):
- value = bytes(value, self._encoding)
- self._body += b'\r\n' + value + b'\r\n'
- @property
- def body(self):
- return (self._body +
- bytes('--{0}--\r\n\r\n'.format(self._boundary), 'utf-8')
- class Client(object):
- def __init__(self,
- cj=None,
- timeout=TIMEOUT,
- userAgent=USER_AGENT):
- self.cj = cj or cookiejar.CookieJar()
- self.timeout = timeout
- self.userAgent = userAgent
- self.opener = urequest.build_opener(
- urequest.HTTPCookieProcessor(self.cj),
- HTTPErrorHandler())
- def _normalizeParams(self, params):
- if not isinstance(params, dict):
- return params
- l = []
- for k, v in params.items():
- if isinstance(v, (list, tuple)):
- l.extend([(k, _) for _ in v])
- continue
- l.append((k, v))
- return l
- def encodeParams(self, params, urlEncoding=None):
- params = self._normalizeParams(params)
- return uparse.urlencode(params, encoding=urlEncoding)
- def request(self,
- url,
- data=None,
- query={},
- files={},
- headers={},
- method=None,
- urlEncoding='utf-8'):
- _headers = headers
- headers = {'Accept': '*/*',
- 'Accept-Encoding': 'gzip, deflate',
- 'User-Agent': self.userAgent}
- if files:
- md = MultipartData(urlEncoding)
- data = self._normalizeParams(data)
- files = self._normalizeParams(files)
- for k, v in data:
- md.append(k, v)
- for k, v in files:
- headers.update({'Content-Type': md.contentType})
- elif data:
- headers.update({
- 'Content-Type': 'application/x-www-form-urlencoded'})
- data = bytes(self.encodeParams(data, urlEncoding), 'ascii')
- headers.update(_headers)
- req = urequest.Request(url, data, headers, method=method)
- resp = self.opener.open(req, timeout=self.timeout)
- resp = Response(resp)
- return resp
- if __name__ == '__main__':
- cl = Client()
- r1 = cl.request("http://httpbin.org/get")
- print(r1.json())
- r2 = cl.request("http://httpbin.org/gzip")
- print(r2.json())
- r3 = cl.request("http://httpbin.org/deflate")
- print(r3.json())
- r4 = cl.request("http://httpbin.org/status/404")
- print("{response.status}: {response.reason}".format(response=r4))
- assert r4.success, "Response fail"
- """
- data = r.data
- try:
- data = data.decode(r.encoding)
- except UnicodeDecodeError:
- data = data.decode('1251')
- data = json.loads(data)
- """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement