dragonbane

New Socket Concept

Oct 19th, 2020 (edited)
432
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 22.16 KB | None | 0 0
  1. Socket Proxy Server:
  2. -Use new http downloader for video frames and bent for the rest
  3. -http downloader for video frames allocates extra bytes at the front by using one more array buffer in the array list. The array buffer is 9 byte static and pre-filled with the video frame res namespace and event name ID and a set Data and Uint8Array view. Nonce and rest is left empty. After download, size byte is written in via direct Uin8Array access and then Data View writes the full size in. View is clamped and added into the array list to create the return buffer. Return 2 views, one normal buffer at 0 (event buffer), the other starting at the beginning of the raw video package (for ffmpeg). Just need to update nonce before dispatch to clients
  4. -Consider making playlist updates incremental. Request full playlist once at start (sequence ID: -1) (sent gzip compressed). Then after client builds sequence index and calls for incremental update with playlist hash ID and the last sequence ID it has tracked. Server responds with every 32 byte hash/duration as float it has after that ID as an array if any. Client attaches extinf 4 byte float and a fake url with the 32 byte hash of every new entry to the end of the playlist array, removes x entries from the start and advances the header data automatically with sequence ID
  5.  
  6. Proxy Client:
  7. -Cache all video frames for 1-2 minutes so dual streams (video/webcam, 3ds screen 1/2) won't consume extra bandwith
  8.  
  9.  
  10.  
  11. General Client:
  12. -Ensure too high backpressure to the server causes a visible user alert to check his connection. User sees a warning to stop playing
  13.  
  14.  
  15.  
  16. MW:
  17.  
  18. Clustering:
  19. -Use Node worker_threads
  20. -Can use this if uWS is clustered and data needs to be shared between workers to achieve pub/sub to all needed clients: http://prntscr.com/w1exdh
  21. -Try avoiding uWS if too much data needs to be shared (can do it if the data is cached in every worker and changes are strongly isolated to individual fields and global tasks are pushed via message port to the main thread, like updating the room list. Or use a Redis DB with pub/sub, e.g. one client updates its own item array in the Redis DB which then is retrieved by the other workers and synced with all their clients)
  22. -Worker data could be efficiently shared by using 2 SharedArrayBuffers for each worker pair or worker-main thread pair big enough to hold every possible data. Can simply copy the eventBuffer in there, move the ptr after the data for the next data block and then send a short message to every worker with the ptr value. Workers can then all access the same data and broadcast it via uWS (1 ms delay max)
  23. -Use this to quickly broadcast to every worker from a worker: https://prnt.sc/w1f6vz
  24. -If multiple servers definitely use Redis for inter-server communication
  25. -Use memory caches as much as possible and delay persistence to DB
  26. -Always use fs methods async instead of sync to free the event loop
  27. -Split isolated CPU heavy sub tasks like event buffer encoding/decoding and compress/decompress into a worker thread pool and make them async to free the event loop (event buffer encoding/decoding would be solved by splitting uWS into a worker pool)
  28.  
  29.  
  30. Compression:
  31. -Disabled in uWS due more optimal binary format
  32. -If a message contains a lot of text content, a singular text field should be turned into a buffer which is app compressed/decompressed. If the message contains many different strings, the entire message should be compressed (always use msg type) and then packed into another wrapper message which contains namespace/name (and nonce) (the inner message skips the nonce due to being msg type). Later the outer message gets unwrapped and then the buffer will be decompressed and then parsed again as the inner message into the final data object by the app
  33. -Node: zlib module. gzip deflate
  34. -Browser: wasm gzip decode, use: https://github.com/drbh/wasm-flate
  35. -Takes around 2.3 ms for a roundtrip
  36.  
  37.  
  38.  
  39. Angular post message setup between GUI and pre-load script:
  40. -Setup two Message Ports to isolate messages (one time only transfer of the port on init) : https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage
  41. -Use something like the existing message system and the Microsoft wrapper around post message to share data and Promisefy it
  42. -Use SharedArrayBuffer to avoid extra serialization and memory allocation
  43. -Have an utility function which returns an awaitable promise which sets the namespace automatically and merely requires event name and data
  44. -Passthrough data from the server to the emu for example is a message buffer wrapped in another outer message buffer indicating pass through. The GUI unwraps the outer layer and copies the inner buffer to a Shared Memory Buffer which is passed to the pre load script which gets the inner message for final interpretation and C++ execution (can possibly also send the inner buffer as a Transferable to avoid the copy)
  45.  
  46.  
  47.  
  48. Console socket:
  49. -Important: Read ints unsigned into appropriate type first. Then if negative * -1 and cast at the same time, else just cast into the final type as dictated by the descriptor (or if negative do value * -1 and save result in an int64 so it always fits, then do a memcpy from the correct position in the int64 to the correct position of the result value e.g. an int16)
  50. -Use TCP_NODELAY and avoid small sends, send entire messages at once (as much as send allows) unless it is 1 byte control messages
  51. -Need to create a singular big enough buffer to handle every possible type of message (on WiiVC can probably allocate a 1 MB one)
  52. -recv non block loop constantly until bytes received > 0 (supply the entire 1 MB buffer and take note how much we read)
  53. -Interpret control byte. 0 = ping (send pong back), 1 = pong (do nothing, but connection is still open), 2 = message
  54. -If bytes read > 1 and format is not message, call action loop again with the pointer moved up ahead (got multiple messages at once)
  55. -Extend the message format just for raw sockets with a byte sizeLength and 1/2/4 bytes length of the entire message so the socket knows immediately how to many bytes to receive total before processing the message. Can add +5 bytes to each maxPayloadSize automatically, and on message creation start writing at position 5. After message is created, backfill the size and size byte and ensure the returned Array View slices away any padding bytes at the start
  56. -If type message queue more recv reads with a global timeout to avoid blocking until total read count is high enough to process the size header first and then the entire message. Next received byte is sizeLength. Repeat for actual size. If it all succeeds go to message interpreter
  57. -If type message and at least 2 bytes arrived total now, read namespace and event and grab descriptor
  58. -Interpret data according to descriptor
  59. -If recv read count was higher than the message needed, we got multiple messages. Enter action loop again with pointer moved up ahead
  60. -Socket sends need to check if the entire buffer was actually delivered size wise, if not, keep doing sends until we delivered the entire buffer length (of the message that we want to send, not the buffer itself)
  61. -Periodically schedule a ping message to confirm the connection is still alive (only when not in send or read mode already of course. Invoke from main loop after enough time has passed and reads for this cycle had no incoming data. Node server must only reply with pong if not already in a read/write cycle)
  62. -Receiver on the server protects by enforcing a limit on each message length. If total bytes received in a short timeframe from the same IP exceeds the set limit, also disconnect asap. Same if the number of received batches/reads exceeds the limit (call limit + byte limit + message size limit). If the message parser fails due bad/invalid data, also disconnect
  63.  
  64.  
  65. Note: How do I construct a specification for the structure of a datagram and know when the datagram has ended - You need a protocol!
  66.  
  67. This is the basics of how the simplest of protocols works:
  68.  
  69. Define a fixed size header this header will by a number of bytes that tells you how long the message is. Put any meta information in the header.
  70. The important part is that the length of the header is fixed. We'll call this header length HEADER_LEN
  71. When receiving a message, construct a buffer, keep writing to that buffer until you've received at least HEADER_LEN.
  72. Split the string into header, extra where header is the header bytes you received and extra are additional bytes you received while consuming the header bytes.
  73. Parse the header
  74. Let's say you've defined HEADER_LEN to be 5 bytes = [4 BYTE INT + NULL].
  75. Parse the header 4 byte int into body_length variable - this will be an integer telling us how long your body is. This design assumes that the CLIENT constructed a properly composed header of at least 5 bytes according to our specification.
  76. ...
  77. If it didn't, we have another mess of issues to deal with. Namely, throwing away the malformed message and finding the next well-formed message.
  78. Unfortunately an error-correction conversation would get long for this post.
  79. Read an additional body_length bytes from the socket. This includes extra bytes we've already received.
  80. You've now received the entire datagram
  81. Wait for the next datagram, repeat. WARNING: Data from the next datagram might already be available from the prior body reads. Figure this out from the header size, so you can isolate the next datagram right away (where it starts after the first one ends)
  82.  
  83.  
  84.  
  85.  
  86. Server script:
  87. -Split MW handling code into separate JS modules which can be open sourced safely without exposing the server code
  88.  
  89. Console read/write security:
  90. -Preload electron script uses sub modules which interact with C++. C++ module only does auth/hook and exposes read/write functions in every data type. Preload script doesn't allow emu read/writes if the main window is not pointed at the mw.ootrandomizer.com domain (which is the only window with access to the preload script). Block these calls unless an active session is in progress to avoid bad code taking over the emulator
  91. -Preload script modules final parse the postMessage messages and execute the console reads/writes in batch for the respective event via C++ calls
  92.  
  93.  
  94. ssize_t recv_all(int socket, char *buffer_ptr, size_t bytes_to_recv)
  95. {
  96. size_t original_bytes_to_recv = bytes_to_recv;
  97.  
  98. // Continue looping while there are still bytes to receive
  99. while (bytes_to_recv > 0)
  100. {
  101. ssize_t ret = recv(socket, buffer_ptr, bytes_to_recv, 0);
  102. if (ret <= 0)
  103. {
  104. // Error or connection closed
  105. return ret;
  106. }
  107.  
  108. // We have received ret bytes
  109. bytes_to_recv -= ret; // Decrease size to receive for next iteration
  110. buffer_ptr += ret; // Increase pointer to point to the next part of the buffer
  111. }
  112.  
  113. return original_bytes_to_recv; // Now all data have been received
  114. }
  115.  
  116. //Usage
  117. // Our data buffer
  118. char buffer[the_full_size_of_data];
  119.  
  120. // Receive all data
  121. recv_all(socket, buffer, sizeof buffer); // TODO: Add error checking
  122.  
  123. -Console should create lists on start that map the namespace/event names and IDs to class static create/parse function pointers. When invoked after namespace/event lookup they parse an existing buffer from recv to a new class instance or cast the submitted ptr to a class instance and write it out to a new event buffer for dispatch. Use template functions to define type IDs to class name
  124.  
  125. #include <iostream>
  126. #include <string>
  127.  
  128. using namespace std;
  129. class Foo {
  130. public:
  131. typedef string (Foo::*staticMethod)();
  132.  
  133. static staticMethod current;
  134.  
  135. static string ohaiWorld() {
  136. return "Ohai, world!";
  137. }
  138.  
  139. static string helloWorld() {
  140. return "Hello, world!";
  141. }
  142.  
  143. static string callCurrent() {
  144. return Foo::current();
  145. }
  146. };
  147.  
  148. Foo::staticMethod Foo::current = &Foo::ohaiWorld;
  149.  
  150. int main() {
  151. cout << Foo::callCurrent() << endl;
  152. Foo::current = &Foo::helloWorld;
  153. cout << Foo::callCurrent() << endl;
  154. return 0;
  155. }
  156.  
  157. std::type_index(typeid(obj)) == std::type_index(typeid(CSubClass)) will be true if and only if the two types are equal!
  158.  
  159. Function Ptr:
  160. PPMF2 myfunc = &MyClass::StaticMemberFunc;
  161.  
  162.  
  163.  
  164. Auth:
  165. Use 2 factor auth:
  166. -Client calls express URL on server which associates the user IP with a sliding 2 byte nonce (for 5 minutes before expiring, can be overwritten by the same IP). Server confirms same origin by ensuring the origin header is set and points to our server. If true, return nonce value. Enforce strong rate limits on this endpoint, so that the same IP bounces if it calls it more than twice in a short timeframe
  167. -Client opens twitch login page with the nonce set as the CSRF token
  168. -User has to login or gets auto re-directed if already logged in with the cookie set. Re-direct URL points at our server
  169. -Server ensures the CSRF token supplied by twitch matches the one stored for this IP (also confirm origin to be from twitch). Exchange the key for the token and ensure it matches our application ID via twitch backend check. If true, generate custom auth token (256 bit) and associate it with the user details as received by the twitch API (user ID, user_name, etc.) + the user IP address. Send OK and custom auth token to the client
  170. -Only IPs that are authorized via "?token=" + custom auth token can get past the first check in the ws upgrade handler after confirming same origin. If token is marked as first ever usage, enforce that the IP matches the one which generated the token. After that set the value false to allow a changed IP to re-connect to the socket using the same token
  171. -If user was already connected, disconnect every other websocket associated with this user
  172. -Upgraded websockets thus automatically have full access to every endpoint and are associated with a user
  173. -If user disconnects unexpectedly, schedule the custom auth token for deletion 10 minutes from now. A re-connect before deletes the timer
  174. -Disconnected client verifies the custom auth token is still valid with a verify token endpoint on our server, otherwise kick user back to full auth flow. If server rejects during upgrade, do the same
  175. -C++ app performs a check every 10 minutes (and on app start) independently which checks if any user is connected to our backend that originates from this request's IP address (or was connected within the last 10 minutes). If not, the C++ app crashes itself (console socket does the same plus additional auth flow when connecting)
  176.  
  177.  
  178. User data storage:
  179. NOTE: Also cache user data in a map per userID after auth to reduce database load. Save userID in connected socket objects. App can then retrieve user profiles quickly per user ID and the socket can retrieve its own profile immediately. Update the values there when the user updates his profile and then UPDATE the db after. Do the same for MW rooms that are opened (room list, room object with status etc.). In user profiles store websocket ID or object too. App can then quickly access sockets and users with 2 way binding
  180. -Cache everything (room list, per room object, racer list). Ensure to cork socket if multiple messages are send in a row (e.g. after joining a new room)
  181.  
  182.  
  183. SSL Refresh functions in Hult Socket:
  184. /* SNI */
  185. addServerName
  186. removeServerName
  187. missingServerName
  188.  
  189. app.addServerName("localhost", {
  190. key_file_name: 'misc/key.pem',
  191. cert_file_name: 'misc/cert.pem',
  192. passphrase: '1234'
  193. });
  194.  
  195. Can use these to remove the server when the cert changes and add it again or add it over the old one. If it doesn't work create a backup domain which points at the same server and an auto refresh master Express server sits in front of the socket Server and sends the current socket domain:port back to the client for connection and the domain sent back is swapped after a cert refresh. The socket server then adds the new server name and cert upon missingServerName and gracefully shuts down the old
  196.  
  197. Can also spawn multiple servers using worker threads (one using the new cert) and then gracefully shut down the other worker:
  198.  
  199. /* This example spawns two worker threads, each with their own
  200. * server listening to the same port (Linux feature). */
  201.  
  202. const uWS = require('../dist/uws.js');
  203. const port = 9001;
  204. const { Worker, isMainThread, threadId } = require('worker_threads');
  205. const os = require('os');
  206.  
  207. if (isMainThread) {
  208. /* Main thread loops over all CPUs */
  209. /* In this case we only spawn two (hardcoded) */
  210. /*os.cpus()*/[0, 1].forEach(() => {
  211. /* Spawn a new thread running this source file */
  212. new Worker(__filename);
  213. });
  214.  
  215. /* I guess main thread joins by default? */
  216. } else {
  217. /* Here we are inside a worker thread */
  218. const app = uWS.SSLApp({
  219. key_file_name: 'misc/key.pem',
  220. cert_file_name: 'misc/cert.pem',
  221. passphrase: '1234'
  222. }).get('/*', (res, req) => {
  223. res.end('Hello Worker!');
  224. }).listen(port, (token) => {
  225. if (token) {
  226. console.log('Listening to port ' + port + ' from thread ' + threadId);
  227. } else {
  228. console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
  229. }
  230. });
  231. }
  232.  
  233. The first (old) worker would just refuse new connections by closing its listening socket so the new worker with the correct cert takes over. Then when everybody disconnected from the first worker it gets destroyed automatically. Signal to switch is received when chokidar detects a change to the letsencrypt key/cert file
  234.  
  235. us_listen_socket_close can be called standalone e.g. when the cert refresh happens to remove a server from accepting new connections while a different server takes over, however the old server can still serve its existing connections until they disconnect themselves. Once all have disconnected, uWS cleans up automatically
  236.  
  237. Likely actual flow: Spin up 2-3 Node workers for 2-3 cores which share the same socket port and all the user/app memory/db access. When the cert changes (detected via chokidar watch) spin up 2-3 new workers with the new cert, wait until they are for sure ready and then close the listening socket on the original 2-3 workers. Users eventually disconnect themselves and the workers end
  238.  
  239.  
  240.  
  241.  
  242. //Angular Example of Websocket
  243. <strong>websocket.service.ts</strong>
  244. import { Injectable } from '@angular/core';
  245. import { Subject, Observer, Observable } from 'rxjs/Rx';
  246. @Injectable()
  247. export class WebsocketService{
  248. public createWebsocket(): Subject<MessageEvent> {
  249. let socket = new WebSocket('wss://echo.websocket.org');
  250. let observable = Observable.create(
  251. (observer: Observer<MessageEvent>) => {
  252. socket.onmessage = observer.next.bind(observer);
  253. socket.onerror = observer.error.bind(observer);
  254. socket.onclose = observer.complete.bind(observer);
  255. return socket.close.bind(socket);
  256. }
  257.  
  258.  
  259.  
  260.  
  261.  
  262. Good article: https://dev.to/mattkrick/replacing-express-with-uwebsockets-48ph
  263. Useful for file serving: https://github.com/uNetworking/uWebSockets.js/discussions/407
  264.  
  265.  
  266.  
  267. OLD:
  268.  
  269. Storing ws connections in a map with ID:
  270. const wsMap = new Map()
  271.  
  272. app.ws('/',{
  273. open:ws=>{
  274. addToMapSet(ws,ws.uid,wsMap)
  275. },
  276. close:(ws,code,message)=>{
  277. removeFromMapSet(ws,ws.uid,wsMap)
  278. }
  279. })
  280.  
  281. const addToMapSet=(item,id,map)=>{if(!map.get(id)?.add(item)){const s=new Set();s.add(item);map.set(id,s)}}
  282. const removeFromMapSet=(item,id,map)=>{const set=map.get(id);set.delete(item);if(!set.size)map.delete(id)}
  283.  
  284.  
  285. Rate Limit:
  286. // rateLimit(ws) returns true if over limit
  287. const RateLimit = (limit,interval)=>{let now=0,last=Symbol(),count=Symbol();setInterval(()=>++now,interval);return ws=>{if(ws[last]!=now){ws[last]=now;ws[count]=1}else{return ++ws[count]>limit}}}
  288.  
  289. const rateLimit = RateLimit(5, 1000)
  290. const rateLimit2 = RateLimit(1, 2000)
  291. const rateLimit3 = RateLimit(10, 3000) // limit per interval (milliseconds)
  292.  
  293.  
  294. Backpressure:
  295. If client disconnects from server or gets disconnected from the server due too high backpressure, warn user that the connection is unstable and should be checked (proxy maybe tries to switch playlists automatically if lower quality is available)
  296. Proxy Server: High (1 MB), Critical (10 MB)
  297. Proxy Client: High (1 KB), Critical (2 KB)
  298. MW Server/Client: High (100 KB), Critical (1 MB)
  299.  
  300. General:
  301. Similarly to for Http, methods such as ws.send(...) can cause backpressure. Make sure to check ws.getBufferedAmount() before sending, and check the return value of ws.send before sending any more data. WebSockets do not have .onWritable, but instead make use of the .drain handler of the websocket route handler.
  302.  
  303. Inside of .drain event you should check ws.getBufferedAmount(), it might have drained, or even increased. Most likely drained but don't assume that it has, .drain event is only a hint that it has changed. Only if fully drained resume socket sending queue
  304.  
  305. Use this to manage backpressure and delay writes until the client has caught up before sending more
  306.  
  307.  
  308. Corking:
  309.  
  310. For Http:
  311. Corking a response is a performance improvement in both CPU and network, as you ready the IO system for writing multiple chunks at once. By default, you're corked in the immediately executing top portion of the route handler. In all other cases, such as when returning from await, or when being called back from an async database request or anything that isn't directly executing in the route handler, you'll want to cork before calling writeStatus, writeHeader or just write. Corking takes a callback in which you execute the writeHeader, writeStatus and such calls, in one atomic IO operation. This is important, not only for TCP but definitely for TLS where each write would otherwise result in one TLS block being sent off, each with one send syscall.
  312.  
  313. Example usage:
  314. res.cork(() => { res.writeStatus("200 OK").writeHeader("Some", "Value").write("Hello world!"); });
  315.  
  316. So if a socket plans to do multiple writes at once, do an async cork first before sending them off. If only needing a single write, skip the corking (as the socket will be auto corked if possible which is faster). This is only needed if async calls happen. If sync the entire function or if async the first part until the first await happens will be properly corked automatically by uWS. But if awaits happen, ensure we cork manually after before consequent writes happen.
  317.  
Add Comment
Please, Sign In to add comment