Advertisement
Tristan-Nel

Untitled

Apr 30th, 2020
596
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /**
  2.  * Create a Kurento Media Server client which controls the KMS instance running on the same server.
  3.  * Feed a local audio stream into the KMS pipeline.
  4.  * Allow React Native clients to receive the stream through WebRTC.
  5.  * Modified usage of https://github.com/Kurento/kurento-tutorial-node/tree/master/kurento-one2many-call
  6.  */
  7. const https = require("https");
  8. const url = require("url");
  9. const express = require("express");
  10. const ws = require("ws");
  11. const fs = require("fs");
  12. const kurento = require("kurento-client");
  13.  
  14. /**
  15.  * GLOBAL VARIABLES
  16.  */
  17.  
  18. var idCounter = 0;
  19. var candidatesQueue = {};
  20. var kurentoClient = null;
  21. var presenter = null;
  22. var viewers = [];
  23. const as_uri = "https://localhost:8443/";
  24. const ws_uri = "ws://localhost:8888/kurento";
  25. const file_uri = "file://../../output.ogg"; // TODO: check
  26. const noPresenterMessage = "No active presenter. Try again later...";
  27.  
  28. /**
  29.  * FUNCTIONS
  30.  */
  31.  
  32. /**
  33.  * Increment the global sessionId counter
  34.  */
  35. function nextUniqueId() {
  36.   idCounter++;
  37.   return idCounter.toString();
  38. }
  39.  
  40. /**
  41.  * Recover kurentoClient for the first time.
  42.  * @param {(error: string, kurentoClient) => void} callback
  43.  */
  44. function getKurentoClient(callback) {
  45.   if (kurentoClient !== null) {
  46.     return callback(null, kurentoClient);
  47.   }
  48.  
  49.   kurento(ws_uri, (error, _kurentoClient) => {
  50.     if (error) {
  51.       console.log("Could not find media server at address " + ws_uri);
  52.       return callback(
  53.         "Could not find media server at address" +
  54.           ws_uri +
  55.           ". Exiting with error " +
  56.           error
  57.       );
  58.     }
  59.  
  60.     kurentoClient = _kurentoClient;
  61.     callback(null, kurentoClient);
  62.   });
  63. }
  64.  
  65. /**
  66.  * Action taken when `presenter` message is received by WebSocket server.
  67.  *
  68.  * Create a new Media Pipeline in KMS.
  69.  *
  70.  * Create Player Endpoint in the Media Pipeline to read local audio file.
  71.  *
  72.  * Create WebRTC Endpoint in the Media Pipeline to serve the stream to other peers.
  73.  *
  74.  * Connect the Player Endpoint to the WebRTC Endpoint.
  75.  *
  76.  * @param {string} sessionId
  77.  * @param {WebSocket} ws
  78.  * @param {{}} sdpOffer
  79.  * @param {(value) => void} callback
  80.  */
  81. function startPresenter(sessionId, callback) {
  82.   clearCandidatesQueue(sessionId);
  83.  
  84.   if (presenter !== null) {
  85.     stop(sessionId);
  86.     return callback(
  87.       "Another user is currently acting as presenter. Try again later ..."
  88.     );
  89.   }
  90.  
  91.   presenter = {
  92.     id: sessionId,
  93.     pipeline: null,
  94.     webRtcEndpoint: null,
  95.   };
  96.  
  97.   getKurentoClient((error, kurentoClient) => {
  98.     if (error) {
  99.       stop(sessionId);
  100.       return callback(error);
  101.     }
  102.  
  103.     if (presenter === null) {
  104.       stop(sessionId);
  105.       return callback(noPresenterMessage);
  106.     }
  107.  
  108.     console.log("Creating MediaPipeline...");
  109.  
  110.     kurentoClient.create("MediaPipeline", function (error, pipeline) {
  111.       if (error) {
  112.         stop(sessionId);
  113.         return callback(error);
  114.       }
  115.  
  116.       if (presenter === null) {
  117.         stop(sessionId);
  118.         return callback(noPresenterMessage);
  119.       }
  120.  
  121.       console.log("MediaPipeline created.");
  122.  
  123.       presenter.pipeline = pipeline;
  124.  
  125.       console.log("Creating PlayerEndpoint...");
  126.  
  127.       const options = { uri: file_uri };
  128.  
  129.       pipeline.create("PlayerEndpoint", options, (error, player) => {
  130.         if (error) {
  131.           stop(sessionId);
  132.           return callback(error);
  133.         }
  134.  
  135.         console.log("PlayerEndpoint created.");
  136.  
  137.         player.on("EndOfStream", (event) => {
  138.           pipeline.release();
  139.           // TODO: Handle ?
  140.           console.warn("End of Stream event");
  141.         });
  142.  
  143.         if (presenter === null) {
  144.           stop(sessionId);
  145.           return callback(noPresenterMessage);
  146.         }
  147.  
  148.         presenter.player = player;
  149.  
  150.         player.play((error) => {
  151.           if (error) return callback(error);
  152.           console.log("Playing...");
  153.         });
  154.       });
  155.     });
  156.   });
  157. }
  158.  
  159. /**
  160.  * Action taken when `listener` message is received by WebSocket server.
  161.  *
  162.  * Use existing Media Pipeline in KMS.
  163.  *
  164.  * Create WebRTC Endpoint in the Media Pipeline for the listener to receive the stream through.
  165.  *
  166.  * Connect listener WebRTC Endpoint to broadcaster WebRTC endpoint.
  167.  *
  168.  * @param {string} sessionId
  169.  * @param {WebSocket} ws
  170.  * @param {{}} sdpOffer
  171.  * @param {(value) => void} callback
  172.  */
  173. function startListener(sessionId, ws, sdpOffer, callback) {
  174.   clearCandidatesQueue(sessionId);
  175.  
  176.   if (presenter === null) {
  177.     stop(sessionId);
  178.     return callback(noPresenterMessage);
  179.   }
  180.  
  181.   presenter.pipeline.create("WebRtcEndpoint", function (error, webRtcEndpoint) {
  182.     if (error) {
  183.       stop(sessionId);
  184.       return callback(error);
  185.     }
  186.     viewers[sessionId] = {
  187.       webRtcEndpoint: webRtcEndpoint,
  188.       ws: ws,
  189.     };
  190.  
  191.     if (presenter === null) {
  192.       stop(sessionId);
  193.       return callback(noPresenterMessage);
  194.     }
  195.  
  196.     if (candidatesQueue[sessionId]) {
  197.       while (candidatesQueue[sessionId].length) {
  198.         var candidate = candidatesQueue[sessionId].shift();
  199.         webRtcEndpoint.addIceCandidate(candidate);
  200.       }
  201.     }
  202.  
  203.     webRtcEndpoint.on("OnIceCandidate", function (event) {
  204.       var candidate = kurento.getComplexType("IceCandidate")(event.candidate);
  205.       ws.send(
  206.         JSON.stringify({
  207.           id: "iceCandidate",
  208.           candidate: candidate,
  209.         })
  210.       );
  211.     });
  212.  
  213.     webRtcEndpoint.processOffer(sdpOffer, function (error, sdpAnswer) {
  214.       if (error) {
  215.         stop(sessionId);
  216.         return callback(error);
  217.       }
  218.       if (presenter === null) {
  219.         stop(sessionId);
  220.         return callback(noPresenterMessage);
  221.       }
  222.  
  223.       presenter.player.connect(webRtcEndpoint, function (error) {
  224.         if (error) {
  225.           stop(sessionId);
  226.           return callback(error);
  227.         }
  228.         if (presenter === null) {
  229.           stop(sessionId);
  230.           return callback(noPresenterMessage);
  231.         }
  232.  
  233.         callback(null, sdpAnswer);
  234.         webRtcEndpoint.gatherCandidates(function (error) {
  235.           if (error) {
  236.             stop(sessionId);
  237.             return callback(error);
  238.           }
  239.         });
  240.       });
  241.     });
  242.   });
  243. }
  244.  
  245. function clearCandidatesQueue(sessionId) {
  246.   if (candidatesQueue[sessionId]) {
  247.     delete candidatesQueue[sessionId];
  248.   }
  249. }
  250.  
  251. function stop(sessionId) {
  252.   if (presenter !== null && presenter.id == sessionId) {
  253.     for (var i in viewers) {
  254.       var viewer = viewers[i];
  255.       if (viewer.ws) {
  256.         viewer.ws.send(
  257.           JSON.stringify({
  258.             id: "stopCommunication",
  259.           })
  260.         );
  261.       }
  262.     }
  263.     presenter.pipeline.release();
  264.     presenter = null;
  265.     viewers = [];
  266.   } else if (viewers[sessionId]) {
  267.     viewers[sessionId].webRtcEndpoint.release();
  268.     delete viewers[sessionId];
  269.   }
  270.  
  271.   clearCandidatesQueue(sessionId);
  272.  
  273.   if (viewers.length < 1 && !presenter) {
  274.     console.log("Closing kurento client");
  275.     kurentoClient.close();
  276.     kurentoClient = null;
  277.   }
  278. }
  279.  
  280. function onIceCandidate(sessionId, _candidate) {
  281.   var candidate = kurento.getComplexType("IceCandidate")(_candidate);
  282.  
  283.   // This first one should never occur
  284.   if (presenter && presenter.id === sessionId && presenter.webRtcEndpoint) {
  285.     console.error("Presenter WebRTC should not be triggered! Debug.");
  286.     console.info("Sending presenter candidate");
  287.     presenter.webRtcEndpoint.addIceCandidate(candidate);
  288.   } else if (viewers[sessionId] && viewers[sessionId].webRtcEndpoint) {
  289.     console.info("Sending viewer candidate");
  290.     viewers[sessionId].webRtcEndpoint.addIceCandidate(candidate);
  291.   } else {
  292.     console.info("Queueing candidate");
  293.     if (!candidatesQueue[sessionId]) {
  294.       candidatesQueue[sessionId] = [];
  295.     }
  296.     candidatesQueue[sessionId].push(candidate);
  297.   }
  298. }
  299.  
  300. /**
  301.  * SERVER
  302.  */
  303. const app = express();
  304. const options = {
  305.   cert: fs.readFileSync("../ssl/fullchain.pem"),
  306.   key: fs.readFileSync("../ssl/privkey.pem"),
  307. };
  308. const asUrl = url.parse(as_uri);
  309. const port = asUrl.port;
  310. const server = https.createServer(options, app).listen(port, () => {
  311.   console.log(`Kurento Server started on ${asUrl.hostname}:${port}`);
  312.   //console.log("Open " + url.format(asUrl) + " with a WebRTC capable browser");
  313. });
  314.  
  315. /**
  316.  * WEBSOCKET SIGNALLING
  317.  * Faciliates WebRTC connection of peers to KMS
  318.  */
  319.  
  320. const wss = new ws.Server({
  321.   server: server,
  322.   path: "/broadcast",
  323. });
  324.  
  325. wss.on("connection", (ws) => {
  326.   const sessionId = nextUniqueId();
  327.   console.log("Connection received with sessionId " + sessionId);
  328.  
  329.   ws.on("error", (error) => {
  330.     console.log("Connection error");
  331.     console.error(error);
  332.     stop(sessionId);
  333.   });
  334.  
  335.   ws.on("close", () => {
  336.     console.log("Connection " + sessionId + " closed");
  337.     stop(sessionId);
  338.   });
  339.  
  340.   ws.on("message", (wsMessage) => {
  341.     const message = JSON.parse(wsMessage);
  342.     console.log("Connection " + sessionId + " received message ", message);
  343.  
  344.     switch (message.id) {
  345.       case "listener":
  346.         startListener(sessionId, ws, message.sdpOffer, (error, sdpAnswer) => {
  347.           if (error) {
  348.             return ws.send(
  349.               JSON.stringify({
  350.                 id: "listenerResponse",
  351.                 response: "rejected",
  352.                 message: error,
  353.               })
  354.             );
  355.           }
  356.  
  357.           ws.send(
  358.             JSON.stringify({
  359.               id: "listenerResponse",
  360.               response: "accepted",
  361.               sdpAnswer: sdpAnswer,
  362.             })
  363.           );
  364.         });
  365.         break;
  366.  
  367.       case "stop":
  368.         stop(sessionId);
  369.         break;
  370.  
  371.       case "onIceCandidate":
  372.         onIceCandidate(sessionId, message.candidate);
  373.         break;
  374.  
  375.       default:
  376.         ws.send(
  377.           JSON.stringify({
  378.             id: "error",
  379.             message: "Invalid message " + message,
  380.           })
  381.         );
  382.         break;
  383.     }
  384.   });
  385. });
  386.  
  387. /**
  388.  * Start broadcast
  389.  */
  390.  
  391. startPresenter("broadcast", (error, sdpAnswer) => {
  392.   if (error) {
  393.     console.error({
  394.       id: "presenterResponse",
  395.       response: "rejected",
  396.       message: error,
  397.     });
  398.   }
  399.   console.log({
  400.     id: "presenterResponse",
  401.     response: "accepted",
  402.     sdpAnswer: sdpAnswer,
  403.   });
  404. });
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement