Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import string
- import time
- import logging
- import asyncio
- from aiortc import RTCSessionDescription
- logger = logging.getLogger(__name__)
- def transaction_id():
- return "".join(random.choice(string.ascii_letters) for x in range(12))
- async def connect_to_janus(id, pc, session, add_track_callback):
- await session.create()
- @pc.on("track")
- def on_track(track):
- logger.info("Receiving {}".format(track))
- add_track_callback(track)
- plugin = await session.attach("janus.plugin.streaming")
- response = await plugin.sync_send({"body": {"request": "list"}})
- logger.info(response["plugindata"]["data"]["list"])
- response = await plugin.async_send(
- {"body": {
- "request": "watch",
- "id": id
- }})
- assert (response["jsep"]["type"] == "offer")
- offer = RTCSessionDescription(sdp=response["jsep"]["sdp"],
- type=response["jsep"]["type"])
- await pc.setRemoteDescription(offer)
- await pc.setLocalDescription(await pc.createAnswer())
- request = {"request": "start"}
- response = await plugin.async_send({
- "body": request,
- "jsep": {
- "sdp": pc.localDescription.sdp,
- "trickle": False,
- "type": pc.localDescription.type
- },
- })
- class JanusPlugin:
- def __init__(self, session, url):
- self._queue = asyncio.Queue()
- self._session = session
- self._url = url
- async def sync_send(self, payload):
- message = {"janus": "message", "transaction": transaction_id()}
- message.update(payload)
- async with self._session._http.post(self._url,
- json=message) as response:
- data = await response.json()
- assert data["janus"] == "success"
- return data
- async def async_send(self, payload):
- message = {"janus": "message", "transaction": transaction_id()}
- message.update(payload)
- logger.debug(f"TO JANUS: {message}")
- async with self._session._http.post(self._url,
- json=message) as response:
- data = await response.json()
- logger.debug(data)
- assert data["janus"] == "ack"
- response = await self._queue.get()
- assert response["transaction"] == message["transaction"]
- return response
- class JanusSession:
- def __init__(self, http_session, url):
- self._http = http_session
- self._poll_task = None
- self._plugins = {}
- self._root_url = url
- self._session_url = None
- async def attach(self, plugin):
- message = {
- "janus": "attach",
- "plugin": plugin,
- "transaction": transaction_id()
- }
- async with self._http.post(self._session_url,
- json=message) as response:
- data = await response.json()
- assert data["janus"] == "success"
- plugin_id = data["data"]["id"]
- plugin = JanusPlugin(self,
- self._session_url + "/" + str(plugin_id))
- self._plugins[plugin_id] = plugin
- return plugin
- async def create(self):
- message = {"janus": "create", "transaction": transaction_id()}
- async with self._http.post(self._root_url, json=message) as response:
- data = await response.json()
- assert data["janus"] == "success"
- session_id = data["data"]["id"]
- self._session_url = self._root_url + "/" + str(session_id)
- self._poll_task = asyncio.ensure_future(self._poll())
- async def destroy(self):
- if self._poll_task:
- self._poll_task.cancel()
- self._poll_task = None
- if self._session_url:
- message = {"janus": "destroy", "transaction": transaction_id()}
- async with self._http.post(self._session_url,
- json=message) as response:
- data = await response.json()
- assert data["janus"] == "success"
- self._session_url = None
- async def _poll(self):
- while True:
- params = {"maxev": 1, "rid": int(time.time() * 1000)}
- async with self._http.get(self._session_url,
- params=params) as response:
- data = await response.json()
- if data["janus"] == "event":
- plugin = self._plugins.get(data["sender"], None)
- if plugin:
- await plugin._queue.put(data)
- else:
- logger.debug(data)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement