Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from PyQt5.QtCore import (
- QEventLoop,
- QObject,
- QUrl
- )
- from PyQt5.QtNetwork import (
- QNetworkAccessManager,
- QNetworkReply,
- QNetworkRequest
- )
- from urllib.parse import urlencode, parse_qsl
- import io
- import json
- import logging
- import mimetypes
- import os
- import re
- import uuid
- log = logging.getLogger(__name__)
- RE_HEADER = re.compile("[a-z]+(-[a-z]+)*$", re.I)
- class Client(QObject):
- def __init__(self, networkManager=None):
- super().__init__()
- self.networkManager = networkManager or QNetworkAccessManager(self)
- # Опциии
- # Максимально допустимое количество редиректов
- self.maxRedirects = 3
- # Автоматически переходить по ссылке при перенаправлении
- self.followLocation = True
- # Подставлять referer при редиректе
- self.autoReferer = True
- self._headers = HttpHeaders({
- 'user-agent': 'Mozilla 5.0 (PyQt5 Client)'
- })
- @property
- def headers(self):
- return self._headers
- @headers.setter
- def headers(self, value):
- if not isinstance(value, HttpHeaders):
- value = HttpHeaders(value)
- self._headers = value
- def request(self,
- url,
- params={},
- data={},
- files={},
- headers={},
- encoding="utf-8", # Кодировка страницы. В requests нет
- _history=[]):
- url = QUrl(url)
- defaults = self.headers.copy()
- defaults.update(headers)
- headers = defaults
- if params:
- q = url.query()
- q = parse_qsl(q, encoding=encoding)
- q = dict(q)
- q.update(params)
- q = urlencode(q, encoding=encoding)
- url.setQuery(q)
- if data or files:
- if files:
- boundary = uuid.uuid4().hex
- delim = "--{}\r\n".format(boundary).encode('ascii')
- end = "--{}--\r\n".format(boundary).encode('ascii')
- buf = io.BytesIO()
- for k, v in data.items():
- buf.write(delim)
- buf.write(
- ('Content-Disposition: form-data;'
- ' name="{}"\r\n\r\n').format(k).encode(encoding)
- )
- buf.write(str(v).encode(encoding))
- buf.write(b"\r\n")
- for name, value in files.items():
- if isinstance(value, (list, tuple)):
- # ('filename', b'<binary_data>')
- filename, content = value
- if not isinstance(content, (bytearray, bytes)):
- content = str(content).encode('u8')
- else:
- filename = value
- content = open(filename, 'rb').read()
- buf.write(delim)
- buf.write(
- ('Content-Disposition: form-data;'
- ' name="{}";'
- ' filename="{}"\r\n'
- 'Content-Type: {}\r\n\r\n').format(
- name,
- os.path.basename(filename),
- (mimetypes.guess_type(filename)[0] or
- "application/octet-stream")
- ).encode(encoding)
- )
- buf.write(content)
- buf.write(b"\r\n")
- buf.write(end)
- data = buf.getvalue()
- headers['content-type'] = \
- "multipart/form-data; boundary={}".format(boundary)
- else:
- data = urlencode(data, encoding=encoding).encode('ascii')
- headers['content-type'] = "application/x-www-form-urlencoded"
- request = QNetworkRequest(QUrl(url))
- for header, value in headers.items():
- request.setRawHeader(
- header.encode('ascii'), str(value).encode('ascii'))
- if data:
- reply = self.networkManager.post(request, data)
- else:
- reply = self.networkManager.get(request)
- loop = QEventLoop()
- reply.finished.connect(loop.quit)
- loop.exec_()
- # http://doc.qt.io/qt-4.8/qnetworkreply.html#error
- error = reply.error()
- # http://doc.qt.io/qt-5/qnetworkreply.html#NetworkError-enum
- if (error != QNetworkReply.NoError and
- # 401 ошибка (возникает при авторизации)
- error != QNetworkReply.ContentAccessDenied and
- # 404 тоже содержит контент
- error != QNetworkReply.ContentNotFoundError):
- # http://doc.qt.io/qt-4.8/qiodevice.html#errorString
- raise NetworkError(reply.errorString())
- response = Response(reply, _history)
- location = response.headers.get("location")
- # На Википедии написано, что вроде как, если метод не является
- # методом HEAD или GET, то пользователь должен подтвердить
- # действие
- if (location and
- self.followLocation and
- (300 < response.status <= 308)):
- if len(_history) == self.maxRedirects:
- raise ClientError(
- "Max redirects ({}) exceeded".format(self.maxRedirects))
- _history.append(response)
- if self.autoReferer:
- # Добавляем referer
- headers["referer"] = response.url
- log.debug("Follow location: %s", location)
- response = self.request(
- url.resolved(QUrl(location)),
- headers=headers,
- _history=_history
- )
- return response
- def get(self, url, params={}, **kw):
- return self.request(url, params, **kw)
- def post(self, url, data={}, files={}, **kw):
- return self.request(url, data=data, files=files, **kw)
- class JsonObject(dict):
- def __getattr__(self, attr):
- return self[attr]
- def __setattr__(self, attr, value):
- self[attr] = value
- class Response:
- def __init__(self, reply, history):
- self.reply = reply
- # self.request = reply.request()
- self.history = list(history)
- self.url = reply.url().toString()
- self.status = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
- self.reason = reply.attribute(
- QNetworkRequest.HttpReasonPhraseAttribute)
- headers = {str(v[0], "ascii"): str(v[1], "ascii")
- for v in reply.rawHeaderPairs()}
- self.headers = HttpHeaders(headers)
- # media subtypes of the "text" type are defined to have a
- # default charset value of "ISO-8859-1" when received via HTTP
- self.encoding = self.headers.charset or "iso-8859-1"
- self._content = None
- @property
- def content(self):
- if self._content is None:
- self._content = self.read()
- return self._content
- @property
- def text(self):
- return str(self.content, self.encoding, errors='replace')
- def json(self, object_hook=JsonObject, **kw):
- return json.loads(self.text, object_hook=object_hook, **kw)
- def read(self, amt=None):
- if amt:
- data = self.reply.read(amt)
- else:
- # Эта вроде возвращает QByteArray, а read просто байты (может баг
- # или так и должно быть)
- data = self.reply.readAll().data()
- return data
- def close(self):
- pass
- def __repr__(self):
- return "<{} [{}]>".format(self.__class__.__name__, self.status)
- class HttpHeaders(dict):
- def __init__(self, *args, **kwargs):
- super().__init__()
- self.update(*args, **kwargs)
- @property
- def contentType(self):
- return self.get("content-type")
- @property
- def mimeType(self):
- if self.contentType:
- return self.contentType.split(';')[0]
- @property
- def charset(self):
- if self.contentType:
- match = re.search(
- r";[ \t]+charset=([^;]+)", self.contentType, re.I)
- if match:
- return match.group(1)
- @staticmethod
- def normalize(key):
- return "-".join(map(str.capitalize, key.split("-")))
- def get(self, key, default=None):
- return super().get(self.normalize(key), default)
- def setdefault(self, key, default=None):
- return super().setdefault(self.normalize(key), default)
- def pop(self, key):
- return super().pop(self.normalize(key))
- def popitem(self, key):
- return super().popitem(self.normalize(key))
- def update(self, *args, **kwargs):
- d = dict(*args, **kwargs)
- for k, v in d.items():
- self[k] = v
- def copy(self):
- return self.__class__(self)
- def __setitem__(self, name, value):
- if not RE_HEADER.match(name):
- raise TypeError("Bad header name: {}".format(name))
- super().__setitem__(self.normalize(name), value)
- def __getitem__(self, key):
- return super().__getitem__(self.normalize(key))
- def __delitem__(self, key):
- return super().__delitem__(self.normalize(key))
- def __contains__(self, key):
- return super().__contains__(self.normalize(key))
- class ClientError(Exception):
- pass
- class NetworkError(ClientError):
- pass
- if __name__ == '__main__':
- from PyQt5.QtCore import QCoreApplication
- import sys
- logging.basicConfig(level=logging.DEBUG)
- app = QCoreApplication(sys.argv)
- http = Client()
- r = http.get("http://httpbin.org/get", {"foo": "bar"})
- print("Get request:")
- print("Url:", r.url)
- print("Code:", r.status)
- print("Message:", r.reason)
- print(r.json().args)
- r = http.post("http://httpbin.org/post", {"foo": "bar"})
- print("Post request:")
- print(r.json().form)
- r = http.post("http://httpbin.org/post", files={"file": __file__})
- print("File uploading:")
- print(r.json().files)
- r = http.get("http://httpbin.org/redirect/3", {"foo": "bar"})
- print(r.text)
- print("History:")
- print("\n".join([v.url for v in r.history]))
- print("Prev history:")
- print("\n".join([v.url for v in r.history[-1].history]))
- # app.exec_()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement