Advertisement
Guest User

janus.py

a guest
Jul 4th, 2022
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.75 KB | None | 0 0
  1. import random
  2. import string
  3. import time
  4. import logging
  5.  
  6. import asyncio
  7. from aiortc import RTCSessionDescription
  8.  
  9. logger = logging.getLogger(__name__)
  10.  
  11.  
  12. def transaction_id():
  13.     return "".join(random.choice(string.ascii_letters) for x in range(12))
  14.  
  15.  
  16. async def connect_to_janus(id, pc, session, add_track_callback):
  17.     await session.create()
  18.  
  19.     @pc.on("track")
  20.     def on_track(track):
  21.         logger.info("Receiving {}".format(track))
  22.         add_track_callback(track)
  23.  
  24.     plugin = await session.attach("janus.plugin.streaming")
  25.     response = await plugin.sync_send({"body": {"request": "list"}})
  26.  
  27.     logger.info(response["plugindata"]["data"]["list"])
  28.  
  29.     response = await plugin.async_send(
  30.         {"body": {
  31.             "request": "watch",
  32.             "id": id
  33.         }})
  34.  
  35.     assert (response["jsep"]["type"] == "offer")
  36.     offer = RTCSessionDescription(sdp=response["jsep"]["sdp"],
  37.                                   type=response["jsep"]["type"])
  38.  
  39.     await pc.setRemoteDescription(offer)
  40.     await pc.setLocalDescription(await pc.createAnswer())
  41.     request = {"request": "start"}
  42.     response = await plugin.async_send({
  43.         "body": request,
  44.         "jsep": {
  45.             "sdp": pc.localDescription.sdp,
  46.             "trickle": False,
  47.             "type": pc.localDescription.type
  48.         },
  49.     })
  50.  
  51.  
  52. class JanusPlugin:
  53.     def __init__(self, session, url):
  54.         self._queue = asyncio.Queue()
  55.         self._session = session
  56.         self._url = url
  57.  
  58.     async def sync_send(self, payload):
  59.         message = {"janus": "message", "transaction": transaction_id()}
  60.         message.update(payload)
  61.         async with self._session._http.post(self._url,
  62.                                             json=message) as response:
  63.             data = await response.json()
  64.             assert data["janus"] == "success"
  65.             return data
  66.  
  67.     async def async_send(self, payload):
  68.         message = {"janus": "message", "transaction": transaction_id()}
  69.         message.update(payload)
  70.         logger.debug(f"TO JANUS: {message}")
  71.         async with self._session._http.post(self._url,
  72.                                             json=message) as response:
  73.             data = await response.json()
  74.             logger.debug(data)
  75.             assert data["janus"] == "ack"
  76.  
  77.         response = await self._queue.get()
  78.         assert response["transaction"] == message["transaction"]
  79.         return response
  80.  
  81.  
  82. class JanusSession:
  83.     def __init__(self, http_session, url):
  84.         self._http = http_session
  85.         self._poll_task = None
  86.         self._plugins = {}
  87.         self._root_url = url
  88.         self._session_url = None
  89.  
  90.     async def attach(self, plugin):
  91.         message = {
  92.             "janus": "attach",
  93.             "plugin": plugin,
  94.             "transaction": transaction_id()
  95.         }
  96.         async with self._http.post(self._session_url,
  97.                                    json=message) as response:
  98.             data = await response.json()
  99.             assert data["janus"] == "success"
  100.             plugin_id = data["data"]["id"]
  101.             plugin = JanusPlugin(self,
  102.                                  self._session_url + "/" + str(plugin_id))
  103.             self._plugins[plugin_id] = plugin
  104.             return plugin
  105.  
  106.     async def create(self):
  107.         message = {"janus": "create", "transaction": transaction_id()}
  108.         async with self._http.post(self._root_url, json=message) as response:
  109.             data = await response.json()
  110.             assert data["janus"] == "success"
  111.             session_id = data["data"]["id"]
  112.             self._session_url = self._root_url + "/" + str(session_id)
  113.  
  114.         self._poll_task = asyncio.ensure_future(self._poll())
  115.  
  116.     async def destroy(self):
  117.         if self._poll_task:
  118.             self._poll_task.cancel()
  119.             self._poll_task = None
  120.  
  121.         if self._session_url:
  122.             message = {"janus": "destroy", "transaction": transaction_id()}
  123.             async with self._http.post(self._session_url,
  124.                                        json=message) as response:
  125.                 data = await response.json()
  126.                 assert data["janus"] == "success"
  127.             self._session_url = None
  128.  
  129.     async def _poll(self):
  130.         while True:
  131.             params = {"maxev": 1, "rid": int(time.time() * 1000)}
  132.             async with self._http.get(self._session_url,
  133.                                       params=params) as response:
  134.                 data = await response.json()
  135.                 if data["janus"] == "event":
  136.                     plugin = self._plugins.get(data["sender"], None)
  137.                     if plugin:
  138.                         await plugin._queue.put(data)
  139.                     else:
  140.                         logger.debug(data)
  141.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement