Advertisement
Guest User

video_receiver.py

a guest
Jul 4th, 2022
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.91 KB | None | 0 0
  1. import asyncio
  2. from threading import current_thread
  3. import aiohttp
  4. import os
  5. import logging
  6. import sys
  7.  
  8. import gi
  9. gi.require_version('Gst', '1.0')
  10. gi.require_version('GstWebRTC', '1.0')
  11. gi.require_version('GstSdp', '1.0')
  12. from gi.repository import GLib, Gst, GstWebRTC, GstSdp
  13.  
  14. from dvr_processor.dvr_processor.janus import JanusSession
  15.  
  16. logging.basicConfig(level=logging.INFO,
  17.                     stream=sys.stdout,
  18.                     format='%(levelname)-8s %(name)s:%(message)s')
  19. logging.getLogger("libav.h264").setLevel(logging.CRITICAL)
  20. logging.getLogger("codec.h264").setLevel(logging.CRITICAL)
  21. logging.getLogger("dvr_processor.dvr_processor.janus").setLevel(logging.DEBUG)
  22.  
  23. logger = logging.getLogger(__name__)
  24.  
  25. async def get_info():
  26.     session = aiohttp.ClientSession(auth=aiohttp.BasicAuth(
  27.         os.environ["DVR_API_LOGIN"], os.environ["DVR_API_PASSWORD"]))
  28.     lge_url = os.environ["DVR_API_HOST"]
  29.     # wait backend
  30.     await asyncio.sleep(30) # FIXME
  31.     resp = await session.get(f"{lge_url}/api/videostream/janus/all")
  32.     resp.raise_for_status()
  33.     j = await resp.json()
  34.     await session.close()
  35.     return j[0]
  36.  
  37.  
  38. class WebRTCClient:
  39.     def __init__(self, conf): # FIXME
  40.         # see https://gitlab.freedesktop.org/gstreamer/gst-examples/-/blob/master/webrtc/sendrecv/gst/webrtc_sendrecv.py
  41.         self.mp = conf["rtspMountpointId"]
  42.         self.dp = conf["dataMountpointId"]
  43.         janus_port = conf["janusPort"]
  44.         Gst.init(None)
  45.  
  46.         http_session = session = aiohttp.ClientSession()
  47.         janus_url = f"http://{os.environ['JANUS_HOST']}:{janus_port}/janus"
  48.         self.janus_session = JanusSession(http_session, janus_url)
  49.         self.loop = asyncio.get_event_loop()
  50.         logger.info(f"initializing in {current_thread()}")
  51.    
  52.     def start_pipeline(self):
  53.         logger.info(f"begin start_pipeline in {current_thread()}")
  54.         self.pipe = Gst.parse_launch("webrtcbin name=wrb")
  55.         self.webrtc = self.pipe
  56.         self.webrtc.connect('pad-added', self.on_incoming_stream)
  57.         self.pipe.set_state(Gst.State.PLAYING)
  58.         logger.info("end start_pipeline")
  59.    
  60.     def remote_sdp(self):
  61.         try:
  62.             descr = self.webrtc.get_property('remote-description')
  63.             try:
  64.                 if descr is not None:
  65.                     return descr.sdp.as_text()
  66.                 else:
  67.                     return descr
  68.             except:
  69.                 return descr
  70.         except:
  71.             return "Failed get_property"
  72.        
  73.     def negotiate(self):
  74.         logger.info(f"begin negotiate in {current_thread()}")
  75.         logger.info(f"negotiate -- remote-description sdp: {self.remote_sdp()}")
  76.         promise = Gst.Promise.new_with_change_func(self.on_answer_created, self.webrtc, None)
  77.         self.webrtc.emit('create-answer', None, promise)
  78.         logger.info("end negotiate")
  79.         return promise
  80.    
  81.     def on_answer_created(self, promise, _, __):
  82.         logger.info(f"begin on_answer_created in {current_thread()}")
  83.         promise.wait()
  84.         answer = promise.get_reply().get_value("answer")
  85.         logger.info(f"answer: {answer}")
  86.         logger.info(f"answer fields: {answer.type}")
  87.         logger.info(f"answer fields: {answer.type} {answer.type.real}")
  88.         logger.info(f"answer fields: {answer.type} {answer.type.to_string(answer.type)}")
  89.         logger.info(f"answer sdp is None: {answer.sdp is None}")
  90.         logger.info(f"answer fields: {answer.sdp}")
  91.         sdp = answer.sdp.as_text()
  92.         logger.info(f"answer sdp: {sdp}")
  93.         promise2 = Gst.Promise.new()
  94.         self.webrtc.emit('set-local-description', answer, promise2)
  95.         logger.info(f"middle on_answer_created, after set-local-description")
  96.         self.send_sdp_schedule(sdp)
  97.         logger.info("end on_answer_created")
  98.    
  99.     def on_incoming_stream(self, _, pad):
  100.         logger.info(f"begin on_incoming stream in {current_thread()}")
  101.         if pad.direction != Gst.PadDirection.SRC:
  102.             return
  103.         sink = Gst.ElementFactory.make("fakesink")
  104.         self.pipe.add(sink)
  105.         sink.sync_state_with_parent()
  106.         self.webrtc.link(sink)
  107.         logger.info(f"added stream in {current_thread()}")
  108.    
  109.     def handle_sdp_offer(self, sdp):
  110.         logger.info(f"begin handle_sdp_offer")
  111.         res, sdpmsg = GstSdp.SDPMessage.new()
  112.         assert res == GstSdp.SDPResult.OK, f"unexpected res {res}"
  113.         GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
  114.         offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
  115.         promise = Gst.Promise.new()
  116.         logger.info(f"wrb target state: {self.webrtc.get_property('signaling-state')}")
  117.         self.webrtc.emit('set-remote-description', offer, promise)
  118.         logger.info("end handle_sdp_offer")
  119.         return promise
  120.  
  121.     async def get_sdp_offer(self):
  122.         await self.janus_session.create()
  123.         plugin = self.janus_plugin = await self.janus_session.attach("janus.plugin.streaming")
  124.         response = await plugin.async_send(
  125.             {"body": {
  126.                 "request": "watch",
  127.                 "id": self.mp
  128.             }})
  129.         assert (response["jsep"]["type"] == "offer")
  130.         sdp = response["jsep"]["sdp"]
  131.         logger.info(f"received SDP offer in {current_thread()}: {sdp}")
  132.         return sdp
  133.  
  134.     async def connect(self):
  135.         logger.info(f"begin connect in {current_thread()}")
  136.         sdp = await self.get_sdp_offer()
  137.         self.start_pipeline()
  138.         p = self.handle_sdp_offer(sdp)
  139.         p.wait()
  140.         p = self.negotiate()
  141.         p.wait()
  142.         logger.info(f"wrb after connect remote-description: {self.webrtc.get_property('remote-description')}")
  143.         logger.info(f"wrb after connect local-description: {self.webrtc.get_property('local-description')}")
  144.    
  145.     def send_sdp_offer(self, sdp):
  146.         logger.info(f"begin send_sdp_offer with {sdp} in {current_thread()}")
  147.         self.loop.run_until_complete(self.send_sdp(sdp))
  148.    
  149.     def send_sdp_schedule(self, sdp):
  150.         asyncio.ensure_future(self.send_sdp(sdp), loop=self.loop)
  151.        
  152.     async def send_sdp(self, sdp):
  153.         logger.info(f"really begin sending sdp offer in {current_thread()}")
  154.         response = await self.janus_plugin.async_send({
  155.             "body": { "request": "start" },
  156.             "jsep": {
  157.                 "sdp": sdp,
  158.                 "trickle": False,
  159.                 "type": "answer"
  160.             },
  161.         })
  162.         logger.info(f"really sent sdp offer in {current_thread()}, got {response}")# asyncio.ensure_future(send_sdp(offer.sdp.as_text()), loop=self.loop)
  163.  
  164.  
  165.  
  166. async def main_test():
  167.     conf = await get_info()
  168.     wrb = WebRTCClient(conf)
  169.     await wrb.connect()
  170.     while True:
  171.         await asyncio.sleep(30)
  172.         logger.info("jord video receiver works")
  173.  
  174. if __name__ == "__main__":
  175.     asyncio.get_event_loop().run_until_complete(main_test())
  176.  
  177.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement