Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import sys
- from PyQt5.QtCore import Qt, QEvent, QObject, QFile, QIODevice, QJsonDocument, QJsonParseError, pyqtSlot, pyqtSignal
- from PyQt5 import QtCore, QtGui, QtWidgets
- from PyQt5.QtWebEngineWidgets import QWebEngineProfile, QWebEngineScript
- from PyQt5.QtWebChannel import QWebChannel, QWebChannelAbstractTransport
- from PyQt5.QtWebSockets import QWebSocketServer
- from PyQt5.QtNetwork import QHostAddress
- class CallHandler(QObject):
- @pyqtSlot()
- def test(self):
- print("call received")
- class WebSocketTransport(QWebChannelAbstractTransport):
- _socket = None
- def __init__(self, socket):
- super().__init__(socket)
- self._socket = socket
- self._socket.textMessageReceived.connect(self.textMessageReceived)
- def sendMessage(self, message):
- print("sendMessage")
- doc = QJsonDocument(message)
- self._socket.sendTextMessage(doc.toJson(QJsonDocument.Compact).decode("utf-8"))
- @pyqtSlot(str)
- def textMessageReceived(self, messageData):
- parser_error = QJsonParseError()
- message = QJsonDocument.fromJson(messageData.encode("utf-8"), parser_error)
- if parser_error.error != QJsonParseError.NoError:
- print("1")
- elif not message.isObject():
- print("2")
- else:
- print("MSG")
- a = {}
- a.update(message.object())
- import dbg; dbg.pyqt_set_trace()
- self.messageReceived.emit(a, self)
- print("MSG OK")
- class WebSocketClientWrapper(QObject):
- _server = None
- clientConnected = pyqtSignal(QWebChannelAbstractTransport)
- def __init__(self, server, parent=None):
- super().__init__(parent)
- self._server = server
- self._server.newConnection.connect(self.handleNewConnection)
- @pyqtSlot()
- def handleNewConnection(self):
- print("CON")
- self.clientConnected.emit(WebSocketTransport(self._server.nextPendingConnection()))
- class Browser(QtWidgets.QMainWindow):
- _ui = None
- _channel = None
- _handler = None
- _server = None
- _client_wrapper = None
- def __init__(self):
- QtWidgets.QMainWindow.__init__(self)
- self._install_js()
- self._ui = Ui_Browser()
- self._ui.setupUi(self)
- self._channel = QWebChannel(self)
- self._handler = CallHandler(self._channel)
- self._channel.registerObject("self._handler", self._handler)
- self._server = QWebSocketServer("Server", QWebSocketServer.NonSecureMode, self)
- self._client_wrapper = WebSocketClientWrapper(self._server, self._server)
- self._client_wrapper.clientConnected.connect(self._channel.connectTo)
- self._server.listen(QHostAddress.LocalHost, 12345)
- self._ui.web_view.page().setWebChannel(self._channel)
- def _install_js(self):
- self._inject_js(":/qtwebchannel/qwebchannel.js", QWebEngineScript.DocumentCreation)
- self._inject_js(os.path.join(BASE_DIR, "app.js"))
- def _inject_js(self, path, injection_point=QWebEngineScript.DocumentReady):
- profile = QWebEngineProfile.defaultProfile()
- js = QWebEngineScript()
- js.setName(os.path.basename(path))
- js_file = QFile(path)
- js_file.open(QIODevice.ReadOnly)
- js.setSourceCode(bytes(js_file.readAll()).decode("utf-8"))
- # TODO: check if it's possible to switch to ApplicationWorld
- # probably we need a different transport socket
- # https://bugreports.qt.io/browse/QTBUG-50318
- js.setWorldId(QWebEngineScript.ApplicationWorld)
- js.setInjectionPoint(injection_point)
- js.setRunsOnSubFrames(False)
- profile.scripts().insert(js)
- if __name__ == "__main__":
- app = QtWidgets.QApplication(sys.argv)
- main = Browser()
- main.show()
- sys.exit(app.exec_())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement