Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Create a Kurento Media Server client which controls the KMS instance running on the same server.
- * Feed a local audio stream into the KMS pipeline.
- * Allow React Native clients to receive the stream through WebRTC.
- * Modified usage of https://github.com/Kurento/kurento-tutorial-node/tree/master/kurento-one2many-call
- */
- const https = require("https");
- const url = require("url");
- const express = require("express");
- const ws = require("ws");
- const fs = require("fs");
- const kurento = require("kurento-client");
- /**
- * GLOBAL VARIABLES
- */
- var idCounter = 0;
- var candidatesQueue = {};
- var kurentoClient = null;
- var presenter = null;
- var viewers = [];
- const as_uri = "https://localhost:8443/";
- const ws_uri = "ws://localhost:8888/kurento";
- const file_uri = "file://../../output.ogg"; // TODO: check
- const noPresenterMessage = "No active presenter. Try again later...";
- /**
- * FUNCTIONS
- */
- /**
- * Increment the global sessionId counter
- */
- function nextUniqueId() {
- idCounter++;
- return idCounter.toString();
- }
- /**
- * Recover kurentoClient for the first time.
- * @param {(error: string, kurentoClient) => void} callback
- */
- function getKurentoClient(callback) {
- if (kurentoClient !== null) {
- return callback(null, kurentoClient);
- }
- kurento(ws_uri, (error, _kurentoClient) => {
- if (error) {
- console.log("Could not find media server at address " + ws_uri);
- return callback(
- "Could not find media server at address" +
- ws_uri +
- ". Exiting with error " +
- error
- );
- }
- kurentoClient = _kurentoClient;
- callback(null, kurentoClient);
- });
- }
- /**
- * Action taken when `presenter` message is received by WebSocket server.
- *
- * Create a new Media Pipeline in KMS.
- *
- * Create Player Endpoint in the Media Pipeline to read local audio file.
- *
- * Create WebRTC Endpoint in the Media Pipeline to serve the stream to other peers.
- *
- * Connect the Player Endpoint to the WebRTC Endpoint.
- *
- * @param {string} sessionId
- * @param {WebSocket} ws
- * @param {{}} sdpOffer
- * @param {(value) => void} callback
- */
- function startPresenter(sessionId, callback) {
- clearCandidatesQueue(sessionId);
- if (presenter !== null) {
- stop(sessionId);
- return callback(
- "Another user is currently acting as presenter. Try again later ..."
- );
- }
- presenter = {
- id: sessionId,
- pipeline: null,
- webRtcEndpoint: null,
- };
- getKurentoClient((error, kurentoClient) => {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- console.log("Creating MediaPipeline...");
- kurentoClient.create("MediaPipeline", function (error, pipeline) {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- console.log("MediaPipeline created.");
- presenter.pipeline = pipeline;
- console.log("Creating PlayerEndpoint...");
- const options = { uri: file_uri };
- pipeline.create("PlayerEndpoint", options, (error, player) => {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- console.log("PlayerEndpoint created.");
- player.on("EndOfStream", (event) => {
- pipeline.release();
- // TODO: Handle ?
- console.warn("End of Stream event");
- });
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- presenter.player = player;
- player.play((error) => {
- if (error) return callback(error);
- console.log("Playing...");
- });
- });
- });
- });
- }
- /**
- * Action taken when `listener` message is received by WebSocket server.
- *
- * Use existing Media Pipeline in KMS.
- *
- * Create WebRTC Endpoint in the Media Pipeline for the listener to receive the stream through.
- *
- * Connect listener WebRTC Endpoint to broadcaster WebRTC endpoint.
- *
- * @param {string} sessionId
- * @param {WebSocket} ws
- * @param {{}} sdpOffer
- * @param {(value) => void} callback
- */
- function startListener(sessionId, ws, sdpOffer, callback) {
- clearCandidatesQueue(sessionId);
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- presenter.pipeline.create("WebRtcEndpoint", function (error, webRtcEndpoint) {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- viewers[sessionId] = {
- webRtcEndpoint: webRtcEndpoint,
- ws: ws,
- };
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- if (candidatesQueue[sessionId]) {
- while (candidatesQueue[sessionId].length) {
- var candidate = candidatesQueue[sessionId].shift();
- webRtcEndpoint.addIceCandidate(candidate);
- }
- }
- webRtcEndpoint.on("OnIceCandidate", function (event) {
- var candidate = kurento.getComplexType("IceCandidate")(event.candidate);
- ws.send(
- JSON.stringify({
- id: "iceCandidate",
- candidate: candidate,
- })
- );
- });
- webRtcEndpoint.processOffer(sdpOffer, function (error, sdpAnswer) {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- presenter.player.connect(webRtcEndpoint, function (error) {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- if (presenter === null) {
- stop(sessionId);
- return callback(noPresenterMessage);
- }
- callback(null, sdpAnswer);
- webRtcEndpoint.gatherCandidates(function (error) {
- if (error) {
- stop(sessionId);
- return callback(error);
- }
- });
- });
- });
- });
- }
- function clearCandidatesQueue(sessionId) {
- if (candidatesQueue[sessionId]) {
- delete candidatesQueue[sessionId];
- }
- }
- function stop(sessionId) {
- if (presenter !== null && presenter.id == sessionId) {
- for (var i in viewers) {
- var viewer = viewers[i];
- if (viewer.ws) {
- viewer.ws.send(
- JSON.stringify({
- id: "stopCommunication",
- })
- );
- }
- }
- presenter.pipeline.release();
- presenter = null;
- viewers = [];
- } else if (viewers[sessionId]) {
- viewers[sessionId].webRtcEndpoint.release();
- delete viewers[sessionId];
- }
- clearCandidatesQueue(sessionId);
- if (viewers.length < 1 && !presenter) {
- console.log("Closing kurento client");
- kurentoClient.close();
- kurentoClient = null;
- }
- }
- function onIceCandidate(sessionId, _candidate) {
- var candidate = kurento.getComplexType("IceCandidate")(_candidate);
- // This first one should never occur
- if (presenter && presenter.id === sessionId && presenter.webRtcEndpoint) {
- console.error("Presenter WebRTC should not be triggered! Debug.");
- console.info("Sending presenter candidate");
- presenter.webRtcEndpoint.addIceCandidate(candidate);
- } else if (viewers[sessionId] && viewers[sessionId].webRtcEndpoint) {
- console.info("Sending viewer candidate");
- viewers[sessionId].webRtcEndpoint.addIceCandidate(candidate);
- } else {
- console.info("Queueing candidate");
- if (!candidatesQueue[sessionId]) {
- candidatesQueue[sessionId] = [];
- }
- candidatesQueue[sessionId].push(candidate);
- }
- }
- /**
- * SERVER
- */
- const app = express();
- const options = {
- cert: fs.readFileSync("../ssl/fullchain.pem"),
- key: fs.readFileSync("../ssl/privkey.pem"),
- };
- const asUrl = url.parse(as_uri);
- const port = asUrl.port;
- const server = https.createServer(options, app).listen(port, () => {
- console.log(`Kurento Server started on ${asUrl.hostname}:${port}`);
- //console.log("Open " + url.format(asUrl) + " with a WebRTC capable browser");
- });
- /**
- * WEBSOCKET SIGNALLING
- * Faciliates WebRTC connection of peers to KMS
- */
- const wss = new ws.Server({
- server: server,
- path: "/broadcast",
- });
- wss.on("connection", (ws) => {
- const sessionId = nextUniqueId();
- console.log("Connection received with sessionId " + sessionId);
- ws.on("error", (error) => {
- console.log("Connection error");
- console.error(error);
- stop(sessionId);
- });
- ws.on("close", () => {
- console.log("Connection " + sessionId + " closed");
- stop(sessionId);
- });
- ws.on("message", (wsMessage) => {
- const message = JSON.parse(wsMessage);
- console.log("Connection " + sessionId + " received message ", message);
- switch (message.id) {
- case "listener":
- startListener(sessionId, ws, message.sdpOffer, (error, sdpAnswer) => {
- if (error) {
- return ws.send(
- JSON.stringify({
- id: "listenerResponse",
- response: "rejected",
- message: error,
- })
- );
- }
- ws.send(
- JSON.stringify({
- id: "listenerResponse",
- response: "accepted",
- sdpAnswer: sdpAnswer,
- })
- );
- });
- break;
- case "stop":
- stop(sessionId);
- break;
- case "onIceCandidate":
- onIceCandidate(sessionId, message.candidate);
- break;
- default:
- ws.send(
- JSON.stringify({
- id: "error",
- message: "Invalid message " + message,
- })
- );
- break;
- }
- });
- });
- /**
- * Start broadcast
- */
- startPresenter("broadcast", (error, sdpAnswer) => {
- if (error) {
- console.error({
- id: "presenterResponse",
- response: "rejected",
- message: error,
- });
- }
- console.log({
- id: "presenterResponse",
- response: "accepted",
- sdpAnswer: sdpAnswer,
- });
- });
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement