Advertisement
Guest User

Untitled

a guest
Apr 19th, 2020
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.17 KB | None | 0 0
  1. import {Connection, Session, StreamEvent} from "openvidu-browser";
  2. import {onSignal, OnSignal_Session, signal, Signal_Session} from "../../openvidu/signal";
  3. import {Answer, Offer, PredefinedPeerConfig} from "../../openvidu/peer";
  4. import {WebRtcPeer} from "openvidu-browser/lib/OpenViduInternal/WebRtcPeer/WebRtcPeer";
  5. import {toArrayBuffer, toChunks, totalSize} from "../../file";
  6. import {assert, combineProcedures, Fetch, HandleMessage, jsonParseDefault, log, MimeType, noop, size} from "shared-ts";
  7. import {sendNowOrOnOpen} from "../../webrtc";
  8. import {HandleRecvMessage, ReceiveMessageSystem, SendMessage, Transport} from "./transports";
  9. import {ConnectionId} from "../../openvidu/openvidu";
  10. import {logMessage, MessageType} from "../../backend";
  11. import {Stream} from "../shared";
  12.  
  13. export type FileMessage = SendMessage & { file: File };
  14. export type FileTransport = Transport<FileMessage>;
  15. export type HandleFile = HandleRecvMessage<FileMessage>;
  16.  
  17. type FileMetadata = {
  18. name: string,
  19. type: MimeType,
  20. size: number,
  21. time: Date,
  22. system: ReceiveMessageSystem
  23. };
  24.  
  25. /**
  26. * Implementation based on WebRTC data channels, sends/receives files p2p.
  27. *
  28. * Uses Session.signal method for signaling.
  29. */
  30. export class DataChannelTransport implements FileTransport {
  31. private dataChannels: { [connectionId: string]: RTCDataChannel } = {};
  32. private peers: { [connectionId: string]: WebRtcPeer } = {};
  33. private signal: Signal_Session;
  34. private onSignal: OnSignal_Session;
  35. private peerConfig: PredefinedPeerConfig;
  36. private onMessage: HandleMessage = noop;
  37. /**
  38. * Queue need to don't break the file transfer order.
  39. */
  40. private messageQueue: FileMessage[] = [];
  41. private queueInProcess: boolean = false;
  42.  
  43. constructor(private session: Session, private fetch: Fetch) {
  44. this.signal = signal(session);
  45. this.onSignal = onSignal(session);
  46. this.peerConfig = {
  47. mode: 'sendrecv',
  48. mediaConstraints: {video: false, audio: false},
  49. simulcast: false,
  50. iceServers: undefined,
  51. mediaStream: new MediaStream() // Must be specified to onicecandidate will be fired
  52. };
  53.  
  54. this.negotiateDataChannels(session);
  55. }
  56.  
  57. send = (message: FileMessage): Promise<void> => {
  58. log('File added to queue:', message.file);
  59. this.messageQueue.push(message);
  60. return this.processQueue();
  61. };
  62.  
  63. /**
  64. * Sends all messages from queue.
  65. *
  66. * Messages will accumulate if not all channel are negotiaged or if queue is processing at that time.
  67. * For that reason queue is processed after channel initializing in {@see initDataChannel}.
  68. */
  69. private processQueue = async (): Promise<void> => {
  70. if (this.queueInProcess || !this.messageQueue.length || !this.allChannelsNegotiated()) {
  71. return;
  72. }
  73.  
  74. this.queueInProcess = true;
  75. log('Processing file queue...');
  76.  
  77. let message;
  78. while (message = this.messageQueue.shift()) {
  79. await this.sendFile(message);
  80. }
  81.  
  82. this.queueInProcess = false;
  83. };
  84.  
  85. private sendFile = async ({file, time}: FileMessage): Promise<void> => {
  86. const metadata = createFileMetadata(file, time, {
  87. from: this.session.connection.connectionId,
  88. stream: Stream.Subscriber
  89. });
  90.  
  91. for (const channel of Object.values(this.dataChannels)) {
  92. sendNowOrOnOpen(channel, JSON.stringify(metadata) as any);
  93.  
  94. // 16 KiB is safe chunk size: https://lgrahl.de/articles/demystifying-webrtc-dc-size-limit.html
  95. const buffers = toChunks(file, 16 * 1024).map(toArrayBuffer);
  96. for (const buffer of buffers) {
  97. sendNowOrOnOpen(channel, await buffer as any);
  98. }
  99. log('File sent:', file.name);
  100.  
  101. const {name, type, size} = metadata;
  102. await logMessage({
  103. type: MessageType.File,
  104. typeRelated: {name, type, size},
  105. time,
  106. connection: this.session.connection.connectionId,
  107. }, this.fetch);
  108. }
  109. };
  110.  
  111. onReceived = (handle: HandleFile): void => {
  112. let metadata: FileMetadata | undefined;
  113. let buffers: ArrayBuffer[] = [];
  114.  
  115. const hdl = ({data}: MessageEvent) => {
  116. if (typeof data === 'string') {
  117. metadata = jsonParseDefault(data);
  118. buffers = [];
  119. return;
  120. }
  121.  
  122. buffers.push(data);
  123. const {name, type, size, time, system} = metadata!;
  124. const currentSize = totalSize(buffers);
  125. handleSizeOverflow(metadata!, currentSize);
  126.  
  127. if (currentSize === size) {
  128. const file = new File(buffers, name, {type});
  129. handle({
  130. custom: {file, time},
  131. system
  132. });
  133. buffers = [];
  134. metadata = undefined;
  135. }
  136. };
  137.  
  138. this.addMessageHandler(hdl);
  139. };
  140.  
  141. private addMessageHandler = (handle: HandleMessage): void => {
  142. this.onMessage = combineProcedures(this.onMessage, handle);
  143.  
  144. for (const channel of Object.values(this.dataChannels)) {
  145. channel.onmessage = this.onMessage;
  146. }
  147. };
  148.  
  149. private negotiateDataChannels = (session: Session): void => {
  150. session.on('streamCreated', async (event) => {
  151. log('Stream created, start offer sending');
  152. const connection = (<StreamEvent>event).stream.connection;
  153. const offer = new Offer(this.signal(connection), this.onSignal(connection));
  154.  
  155. const peer = offer.createOfferer(this.peerConfig);
  156. this.peers[connection.connectionId] = peer;
  157. const channel = peer.pc.createDataChannel('fileSending');
  158. await this.initDataChannel(channel, connection);
  159. return offer.sendOffer(peer);
  160. });
  161.  
  162. const answer = new Answer(this.signal, this.onSignal());
  163. answer.onOffer(this.peerConfig, (peer, from) => {
  164. this.peers[from.connectionId] = peer;
  165. peer.pc.ondatachannel = ({channel}: RTCDataChannelEvent) => this.initDataChannel(channel, from);
  166. });
  167.  
  168. session.on('streamDestroyed', (event) => {
  169. const connection = (<StreamEvent>event).stream.connection;
  170. this.closeRtcConnections(connection.connectionId);
  171. });
  172.  
  173. session.on('sessionDisconnected', () => {
  174. for (const id of Object.keys(this.dataChannels)) {
  175. this.closeRtcConnections(id);
  176. }
  177.  
  178. assert(size(this.peers) === 0, 'All peers must be cleared in closeRtcConnections');
  179. });
  180. };
  181.  
  182. private allChannelsNegotiated = (): boolean => {
  183. return this.session.streamManagers.length - 1 === size(this.peers)
  184. && this.session.streamManagers.length - 1 === size(this.dataChannels);
  185. };
  186.  
  187. private initDataChannel = (channel: RTCDataChannel, from: Connection): Promise<void> => {
  188. log('Data channnel created:', channel);
  189. channel.binaryType = 'arraybuffer';
  190. channel.onerror = console.log;
  191. channel.onmessage = this.onMessage;
  192. window.addEventListener('beforeunload', () => channel.close());
  193. this.dataChannels[from.connectionId] = channel;
  194. return this.processQueue();
  195. };
  196.  
  197. private closeRtcConnections = (id: ConnectionId): void => {
  198. this.dataChannels[id].close();
  199. delete this.dataChannels[id];
  200. log('Data channel deleted:', id);
  201.  
  202. this.peers[id].pc.close();
  203. delete this.peers[id];
  204. log('WebRTC peer deleted:', id);
  205. };
  206. }
  207.  
  208. const createFileMetadata = (file: File, time: Date, system: ReceiveMessageSystem): FileMetadata => {
  209. const {name, type, size} = file;
  210. return {name, type, size, time, system};
  211. };
  212.  
  213. const handleSizeOverflow = (metadata: FileMetadata, chunksSize: number): void => {
  214. if (chunksSize > metadata.size) {
  215. log(`File ${metadata.name} size overflow: chunks size ${chunksSize} more then file size ${metadata.size}.`);
  216. }
  217. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement