Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from threading import current_thread
- import aiohttp
- import os
- import logging
- import sys
- import gi
- gi.require_version('Gst', '1.0')
- gi.require_version('GstWebRTC', '1.0')
- gi.require_version('GstSdp', '1.0')
- from gi.repository import GLib, Gst, GstWebRTC, GstSdp
- from dvr_processor.dvr_processor.janus import JanusSession
- logging.basicConfig(level=logging.INFO,
- stream=sys.stdout,
- format='%(levelname)-8s %(name)s:%(message)s')
- logging.getLogger("libav.h264").setLevel(logging.CRITICAL)
- logging.getLogger("codec.h264").setLevel(logging.CRITICAL)
- logging.getLogger("dvr_processor.dvr_processor.janus").setLevel(logging.DEBUG)
- logger = logging.getLogger(__name__)
- async def get_info():
- session = aiohttp.ClientSession(auth=aiohttp.BasicAuth(
- os.environ["DVR_API_LOGIN"], os.environ["DVR_API_PASSWORD"]))
- lge_url = os.environ["DVR_API_HOST"]
- # wait backend
- await asyncio.sleep(30) # FIXME
- resp = await session.get(f"{lge_url}/api/videostream/janus/all")
- resp.raise_for_status()
- j = await resp.json()
- await session.close()
- return j[0]
- class WebRTCClient:
- def __init__(self, conf): # FIXME
- # see https://gitlab.freedesktop.org/gstreamer/gst-examples/-/blob/master/webrtc/sendrecv/gst/webrtc_sendrecv.py
- self.mp = conf["rtspMountpointId"]
- self.dp = conf["dataMountpointId"]
- janus_port = conf["janusPort"]
- Gst.init(None)
- http_session = session = aiohttp.ClientSession()
- janus_url = f"http://{os.environ['JANUS_HOST']}:{janus_port}/janus"
- self.janus_session = JanusSession(http_session, janus_url)
- self.loop = asyncio.get_event_loop()
- logger.info(f"initializing in {current_thread()}")
- def start_pipeline(self):
- logger.info(f"begin start_pipeline in {current_thread()}")
- self.pipe = Gst.parse_launch("webrtcbin name=wrb")
- self.webrtc = self.pipe
- self.webrtc.connect('pad-added', self.on_incoming_stream)
- self.pipe.set_state(Gst.State.PLAYING)
- logger.info("end start_pipeline")
- def remote_sdp(self):
- try:
- descr = self.webrtc.get_property('remote-description')
- try:
- if descr is not None:
- return descr.sdp.as_text()
- else:
- return descr
- except:
- return descr
- except:
- return "Failed get_property"
- def negotiate(self):
- logger.info(f"begin negotiate in {current_thread()}")
- logger.info(f"negotiate -- remote-description sdp: {self.remote_sdp()}")
- promise = Gst.Promise.new_with_change_func(self.on_answer_created, self.webrtc, None)
- self.webrtc.emit('create-answer', None, promise)
- logger.info("end negotiate")
- return promise
- def on_answer_created(self, promise, _, __):
- logger.info(f"begin on_answer_created in {current_thread()}")
- promise.wait()
- answer = promise.get_reply().get_value("answer")
- logger.info(f"answer: {answer}")
- logger.info(f"answer fields: {answer.type}")
- logger.info(f"answer fields: {answer.type} {answer.type.real}")
- logger.info(f"answer fields: {answer.type} {answer.type.to_string(answer.type)}")
- logger.info(f"answer sdp is None: {answer.sdp is None}")
- logger.info(f"answer fields: {answer.sdp}")
- sdp = answer.sdp.as_text()
- logger.info(f"answer sdp: {sdp}")
- promise2 = Gst.Promise.new()
- self.webrtc.emit('set-local-description', answer, promise2)
- logger.info(f"middle on_answer_created, after set-local-description")
- self.send_sdp_schedule(sdp)
- logger.info("end on_answer_created")
- def on_incoming_stream(self, _, pad):
- logger.info(f"begin on_incoming stream in {current_thread()}")
- if pad.direction != Gst.PadDirection.SRC:
- return
- sink = Gst.ElementFactory.make("fakesink")
- self.pipe.add(sink)
- sink.sync_state_with_parent()
- self.webrtc.link(sink)
- logger.info(f"added stream in {current_thread()}")
- def handle_sdp_offer(self, sdp):
- logger.info(f"begin handle_sdp_offer")
- res, sdpmsg = GstSdp.SDPMessage.new()
- assert res == GstSdp.SDPResult.OK, f"unexpected res {res}"
- GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
- offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
- promise = Gst.Promise.new()
- logger.info(f"wrb target state: {self.webrtc.get_property('signaling-state')}")
- self.webrtc.emit('set-remote-description', offer, promise)
- logger.info("end handle_sdp_offer")
- return promise
- async def get_sdp_offer(self):
- await self.janus_session.create()
- plugin = self.janus_plugin = await self.janus_session.attach("janus.plugin.streaming")
- response = await plugin.async_send(
- {"body": {
- "request": "watch",
- "id": self.mp
- }})
- assert (response["jsep"]["type"] == "offer")
- sdp = response["jsep"]["sdp"]
- logger.info(f"received SDP offer in {current_thread()}: {sdp}")
- return sdp
- async def connect(self):
- logger.info(f"begin connect in {current_thread()}")
- sdp = await self.get_sdp_offer()
- self.start_pipeline()
- p = self.handle_sdp_offer(sdp)
- p.wait()
- p = self.negotiate()
- p.wait()
- logger.info(f"wrb after connect remote-description: {self.webrtc.get_property('remote-description')}")
- logger.info(f"wrb after connect local-description: {self.webrtc.get_property('local-description')}")
- def send_sdp_offer(self, sdp):
- logger.info(f"begin send_sdp_offer with {sdp} in {current_thread()}")
- self.loop.run_until_complete(self.send_sdp(sdp))
- def send_sdp_schedule(self, sdp):
- asyncio.ensure_future(self.send_sdp(sdp), loop=self.loop)
- async def send_sdp(self, sdp):
- logger.info(f"really begin sending sdp offer in {current_thread()}")
- response = await self.janus_plugin.async_send({
- "body": { "request": "start" },
- "jsep": {
- "sdp": sdp,
- "trickle": False,
- "type": "answer"
- },
- })
- logger.info(f"really sent sdp offer in {current_thread()}, got {response}")# asyncio.ensure_future(send_sdp(offer.sdp.as_text()), loop=self.loop)
- async def main_test():
- conf = await get_info()
- wrb = WebRTCClient(conf)
- await wrb.connect()
- while True:
- await asyncio.sleep(30)
- logger.info("jord video receiver works")
- if __name__ == "__main__":
- asyncio.get_event_loop().run_until_complete(main_test())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement