Guest User

Untitled

a guest
Sep 22nd, 2020
512
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.51 KB | None | 0 0
  1. 'use strict';
  2.  
  3. const EventEmitter = require('events');
  4. const WebSocketShard = require('./WebSocketShard');
  5. const PacketHandlers = require('./handlers');
  6. const { Error: DJSError } = require('../../errors');
  7. const Collection = require('../../util/Collection');
  8. const { Events, ShardEvents, Status, WSCodes, WSEvents } = require('../../util/Constants');
  9. const Util = require('../../util/Util');
  10.  
  11. const BeforeReadyWhitelist = [
  12. WSEvents.READY,
  13. WSEvents.RESUMED,
  14. WSEvents.GUILD_CREATE,
  15. WSEvents.GUILD_DELETE,
  16. WSEvents.GUILD_MEMBERS_CHUNK,
  17. WSEvents.GUILD_MEMBER_ADD,
  18. WSEvents.GUILD_MEMBER_REMOVE,
  19. ];
  20.  
  21. const UNRECOVERABLE_CLOSE_CODES = Object.keys(WSCodes)
  22. .slice(1)
  23. .map(Number);
  24. const UNRESUMABLE_CLOSE_CODES = [1000, 4006, 4007];
  25.  
  26. /**
  27. * The WebSocket manager for this client.
  28. * <info>This class forwards raw dispatch events,
  29. * read more about it here {@link https://discordapp.com/developers/docs/topics/gateway}</info>
  30. * @extends EventEmitter
  31. */
  32. class WebSocketManager extends EventEmitter {
  33. constructor(client) {
  34. super();
  35.  
  36. /**
  37. * The client that instantiated this WebSocketManager
  38. * @type {Client}
  39. * @readonly
  40. * @name WebSocketManager#client
  41. */
  42. Object.defineProperty(this, 'client', { value: client });
  43.  
  44. /**
  45. * The gateway this manager uses
  46. * @type {?string}
  47. */
  48. this.gateway = undefined;
  49.  
  50. /**
  51. * The amount of shards this manager handles
  52. * @private
  53. * @type {number}
  54. */
  55. this.totalShards = this.client.options.shards.length;
  56.  
  57. /**
  58. * A collection of all shards this manager handles
  59. * @type {Collection<number, WebSocketShard>}
  60. */
  61. this.shards = new Collection();
  62.  
  63. /**
  64. * An array of shards to be connected or that need to reconnect
  65. * @type {Set<WebSocketShard>}
  66. * @private
  67. * @name WebSocketManager#shardQueue
  68. */
  69. Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true });
  70.  
  71. /**
  72. * An array of queued events before this WebSocketManager became ready
  73. * @type {object[]}
  74. * @private
  75. * @name WebSocketManager#packetQueue
  76. */
  77. Object.defineProperty(this, 'packetQueue', { value: [] });
  78.  
  79. /**
  80. * The current status of this WebSocketManager
  81. * @type {number}
  82. */
  83. this.status = Status.IDLE;
  84.  
  85. /**
  86. * If this manager was destroyed. It will prevent shards from reconnecting
  87. * @type {boolean}
  88. * @private
  89. */
  90. this.destroyed = false;
  91.  
  92. /**
  93. * If this manager is currently reconnecting one or multiple shards
  94. * @type {boolean}
  95. * @private
  96. */
  97. this.reconnecting = false;
  98.  
  99. /**
  100. * The current session limit of the client
  101. * @private
  102. * @type {?Object}
  103. * @prop {number} total Total number of identifies available
  104. * @prop {number} remaining Number of identifies remaining
  105. * @prop {number} reset_after Number of milliseconds after which the limit resets
  106. */
  107. this.sessionStartLimit = undefined;
  108. }
  109.  
  110. /**
  111. * The average ping of all WebSocketShards
  112. * @type {number}
  113. * @readonly
  114. */
  115. get ping() {
  116. const sum = this.shards.reduce((a, b) => a + b.ping, 0);
  117. return sum / this.shards.size;
  118. }
  119.  
  120. /**
  121. * Emits a debug message.
  122. * @param {string} message The debug message
  123. * @param {?WebSocketShard} [shard] The shard that emitted this message, if any
  124. * @private
  125. */
  126. debug(message, shard) {
  127. this.client.emit(Events.DEBUG, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`);
  128. }
  129.  
  130. /**
  131. * Connects this manager to the gateway.
  132. * @private
  133. */
  134. async connect() {
  135. const invalidToken = new DJSError(WSCodes[4004]);
  136. const {
  137. url: gatewayURL,
  138. shards: recommendedShards,
  139. session_start_limit: sessionStartLimit,
  140. } = await this.client.api.gateway.bot.get().catch(error => {
  141. throw error.httpStatus === 401 ? invalidToken : error;
  142. });
  143.  
  144. this.sessionStartLimit = sessionStartLimit;
  145.  
  146. const { total, remaining, reset_after } = sessionStartLimit;
  147.  
  148. this.debug(`Fetched Gateway Information
  149. URL: ${gatewayURL}
  150. Recommended Shards: ${recommendedShards}`);
  151.  
  152. this.debug(`Session Limit Information
  153. Total: ${total}
  154. Remaining: ${remaining}`);
  155.  
  156. this.gateway = `${gatewayURL}/`;
  157.  
  158. let { shards } = this.client.options;
  159.  
  160. if (shards === 'auto') {
  161. this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`);
  162. this.totalShards = this.client.options.shardCount = recommendedShards;
  163. shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i);
  164. }
  165.  
  166. this.totalShards = shards.length;
  167. this.debug(`Spawning shards: ${shards.join(', ')}`);
  168. this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id)));
  169.  
  170. await this._handleSessionLimit(remaining, reset_after);
  171.  
  172. return this.createShards();
  173. }
  174.  
  175. /**
  176. * Handles the creation of a shard.
  177. * @returns {Promise<boolean>}
  178. * @private
  179. */
  180. async createShards() {
  181. // If we don't have any shards to handle, return
  182. if (!this.shardQueue.size) return false;
  183.  
  184. const [shard] = this.shardQueue;
  185.  
  186. this.shardQueue.delete(shard);
  187.  
  188. if (!shard.eventsAttached) {
  189. shard.on(ShardEvents.ALL_READY, unavailableGuilds => {
  190. /**
  191. * Emitted when a shard turns ready.
  192. * @event Client#shardReady
  193. * @param {number} id The shard ID that turned ready
  194. * @param {?Set<string>} unavailableGuilds Set of unavailable guild IDs, if any
  195. */
  196. this.client.emit(Events.SHARD_READY, shard.id, unavailableGuilds);
  197.  
  198. if (!this.shardQueue.size) this.reconnecting = false;
  199. this.checkShardsReady();
  200. });
  201.  
  202. shard.on(ShardEvents.CLOSE, event => {
  203. if (event.code === 1000 ? this.destroyed : UNRECOVERABLE_CLOSE_CODES.includes(event.code)) {
  204. /**
  205. * Emitted when a shard's WebSocket disconnects and will no longer reconnect.
  206. * @event Client#shardDisconnect
  207. * @param {CloseEvent} event The WebSocket close event
  208. * @param {number} id The shard ID that disconnected
  209. */
  210. this.client.emit(Events.SHARD_DISCONNECT, event, shard.id);
  211. this.debug(WSCodes[event.code], shard);
  212. return;
  213. }
  214.  
  215. if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) {
  216. // These event codes cannot be resumed
  217. shard.sessionID = undefined;
  218. }
  219.  
  220. /**
  221. * Emitted when a shard is attempting to reconnect or re-identify.
  222. * @event Client#shardReconnecting
  223. * @param {number} id The shard ID that is attempting to reconnect
  224. */
  225. this.client.emit(Events.SHARD_RECONNECTING, shard.id);
  226.  
  227. this.shardQueue.add(shard);
  228.  
  229. if (shard.sessionID) {
  230. this.debug(`Session ID is present, attempting an immediate reconnect...`, shard);
  231. this.reconnect(true);
  232. } else {
  233. shard.destroy({ reset: true, emit: false, log: false });
  234. this.reconnect();
  235. }
  236. });
  237.  
  238. shard.on(ShardEvents.INVALID_SESSION, () => {
  239. this.client.emit(Events.SHARD_RECONNECTING, shard.id);
  240. });
  241.  
  242. shard.on(ShardEvents.DESTROYED, () => {
  243. this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard);
  244.  
  245. this.client.emit(Events.SHARD_RECONNECTING, shard.id);
  246.  
  247. this.shardQueue.add(shard);
  248. this.reconnect();
  249. });
  250.  
  251. shard.eventsAttached = true;
  252. }
  253.  
  254. this.shards.set(shard.id, shard);
  255.  
  256. try {
  257. await shard.connect();
  258. } catch (error) {
  259. if (error && error.code && UNRECOVERABLE_CLOSE_CODES.includes(error.code)) {
  260. throw new DJSError(WSCodes[error.code]);
  261. // Undefined if session is invalid, error event for regular closes
  262. } else if (!error || error.code) {
  263. this.debug('Failed to connect to the gateway, requeueing...', shard);
  264. this.shardQueue.add(shard);
  265. } else {
  266. throw error;
  267. }
  268. }
  269. // If we have more shards, add a 5s delay
  270. if (this.shardQueue.size) {
  271. this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`);
  272. await Util.delayFor(5000);
  273. await this._handleSessionLimit();
  274. return this.createShards();
  275. }
  276.  
  277. return true;
  278. }
  279.  
  280. /**
  281. * Handles reconnects for this manager.
  282. * @param {boolean} [skipLimit=false] IF this reconnect should skip checking the session limit
  283. * @private
  284. * @returns {Promise<boolean>}
  285. */
  286. async reconnect(skipLimit = false) {
  287. if (this.reconnecting || this.status !== Status.READY) return false;
  288. this.reconnecting = true;
  289. try {
  290. if (!skipLimit) await this._handleSessionLimit();
  291. await this.createShards();
  292. } catch (error) {
  293. this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`);
  294. if (error.httpStatus !== 401) {
  295. this.debug(`Possible network error occurred. Retrying in 5s...`);
  296. await Util.delayFor(5000);
  297. this.reconnecting = false;
  298. return this.reconnect();
  299. }
  300. // If we get an error at this point, it means we cannot reconnect anymore
  301. if (this.client.listenerCount(Events.INVALIDATED)) {
  302. /**
  303. * Emitted when the client's session becomes invalidated.
  304. * You are expected to handle closing the process gracefully and preventing a boot loop
  305. * if you are listening to this event.
  306. * @event Client#invalidated
  307. */
  308. this.client.emit(Events.INVALIDATED);
  309. // Destroy just the shards. This means you have to handle the cleanup yourself
  310. this.destroy();
  311. } else {
  312. this.client.destroy();
  313. }
  314. } finally {
  315. this.reconnecting = false;
  316. }
  317. return true;
  318. }
  319.  
  320. /**
  321. * Broadcasts a packet to every shard this manager handles.
  322. * @param {Object} packet The packet to send
  323. * @private
  324. */
  325. broadcast(packet) {
  326. for (const shard of this.shards.values()) shard.send(packet);
  327. }
  328.  
  329. /**
  330. * Destroys this manager and all its shards.
  331. * @private
  332. */
  333. destroy() {
  334. if (this.destroyed) return;
  335. this.debug(`Manager was destroyed. Called by:\n${new Error('MANAGER_DESTROYED').stack}`);
  336. this.destroyed = true;
  337. this.shardQueue.clear();
  338. for (const shard of this.shards.values()) shard.destroy({ closeCode: 1000, reset: true, emit: false, log: false });
  339. }
  340.  
  341. /**
  342. * Handles the timeout required if we cannot identify anymore.
  343. * @param {number} [remaining] The amount of remaining identify sessions that can be done today
  344. * @param {number} [resetAfter] The amount of time in which the identify counter resets
  345. * @private
  346. */
  347. async _handleSessionLimit(remaining, resetAfter) {
  348. if (typeof remaining === 'undefined' && typeof resetAfter === 'undefined') {
  349. const { session_start_limit } = await this.client.api.gateway.bot.get();
  350. this.sessionStartLimit = session_start_limit;
  351. remaining = session_start_limit.remaining;
  352. resetAfter = session_start_limit.reset_after;
  353. this.debug(`Session Limit Information
  354. Total: ${session_start_limit.total}
  355. Remaining: ${remaining}`);
  356. }
  357. if (!remaining) {
  358. this.debug(`Exceeded identify threshold. Will attempt a connection in ${resetAfter}ms`);
  359. await Util.delayFor(resetAfter);
  360. }
  361. }
  362.  
  363. /**
  364. * Processes a packet and queues it if this WebSocketManager is not ready.
  365. * @param {Object} [packet] The packet to be handled
  366. * @param {WebSocketShard} [shard] The shard that will handle this packet
  367. * @returns {boolean}
  368. * @private
  369. */
  370. handlePacket(packet, shard) {
  371. if (packet && this.status !== Status.READY) {
  372. if (!BeforeReadyWhitelist.includes(packet.t)) {
  373. this.packetQueue.push({ packet, shard });
  374. return false;
  375. }
  376. }
  377.  
  378. if (this.packetQueue.length) {
  379. const item = this.packetQueue.shift();
  380. this.client.setImmediate(() => {
  381. this.handlePacket(item.packet, item.shard);
  382. });
  383. }
  384.  
  385. if (packet && PacketHandlers[packet.t]) {
  386. PacketHandlers[packet.t](this.client, packet, shard);
  387. }
  388.  
  389. return true;
  390. }
  391.  
  392. /**
  393. * Checks whether the client is ready to be marked as ready.
  394. * @private
  395. */
  396. async checkShardsReady() {
  397. if (this.status === Status.READY) return;
  398. if (this.shards.size !== this.totalShards || this.shards.some(s => s.status !== Status.READY)) {
  399. return;
  400. }
  401.  
  402. this.status = Status.NEARLY;
  403.  
  404. if (this.client.options.fetchAllMembers) {
  405. try {
  406. const promises = this.client.guilds.cache.map(guild => {
  407. if (guild.available) return guild.members.fetch();
  408. // Return empty promise if guild is unavailable
  409. return Promise.resolve();
  410. });
  411. await Promise.all(promises);
  412. } catch (err) {
  413. this.debug(`Failed to fetch all members before ready! ${err}\n${err.stack}`);
  414. }
  415. }
  416.  
  417. this.triggerClientReady();
  418. }
  419.  
  420. /**
  421. * Causes the client to be marked as ready and emits the ready event.
  422. * @private
  423. */
  424. triggerClientReady() {
  425. this.status = Status.READY;
  426.  
  427. this.client.readyAt = new Date();
  428.  
  429. /**
  430. * Emitted when the client becomes ready to start working.
  431. * @event Client#ready
  432. */
  433. this.client.emit(Events.CLIENT_READY);
  434.  
  435. this.handlePacket();
  436. }
  437. }
  438.  
  439. module.exports = WebSocketManager;
  440.  
Add Comment
Please, Sign In to add comment