Advertisement
Guest User

Untitled

a guest
Mar 4th, 2025
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.31 KB | None | 0 0
  1. import gi
  2. gi.require_version('Gst', '1.0')
  3. from gi.repository import Gst, GLib
  4. import threading
  5.  
  6. class GstRelay:
  7.     def __init__(self):
  8.         Gst.init(None)
  9.  
  10.         # Pipeline 1: Generate video frames
  11.         self.pipeline1 = Gst.parse_launch(
  12.             "videotestsrc is-live=true ! video/x-raw,format=RGB,width=640,height=480 ! videoconvert ! appsink name=sink"
  13.         )
  14.         self.appsink = self.pipeline1.get_by_name("sink")
  15.         self.appsink.set_property("emit-signals", True)
  16.         self.appsink.connect("new-sample", self.on_new_sample)
  17.  
  18.         # Pipeline 2: Receive video frames
  19.         self.pipeline2 = Gst.parse_launch(
  20.             "appsrc name=src is-live=true format=time ! videoconvert ! autovideosink"
  21.         )
  22.         self.appsrc = self.pipeline2.get_by_name("src")
  23.         self.appsrc.set_property("caps", Gst.Caps.from_string("video/x-raw,format=RGB,width=640,height=480"))
  24.  
  25.     def on_new_sample(self, sink):
  26.         """Callback when new frame arrives at appsink (Pipeline 1)."""
  27.         sample = sink.emit("pull-sample")
  28.         if not sample:
  29.             return Gst.FlowReturn.ERROR
  30.  
  31.         buffer = sample.get_buffer()
  32.         success, map_info = buffer.map(Gst.MapFlags.READ)
  33.         if not success:
  34.             return Gst.FlowReturn.ERROR
  35.  
  36.         # Create a new Gst.Buffer for appsrc
  37.         new_buffer = Gst.Buffer.new_allocate(None, len(map_info.data), None)
  38.         new_buffer.fill(0, map_info.data)
  39.         new_buffer.pts = buffer.pts
  40.         new_buffer.dts = buffer.dts
  41.         new_buffer.duration = buffer.duration
  42.  
  43.         buffer.unmap(map_info)
  44.  
  45.         # Push buffer to appsrc (Pipeline 2)
  46.         self.appsrc.emit("push-buffer", new_buffer)
  47.  
  48.         return Gst.FlowReturn.OK
  49.  
  50.     def start(self):
  51.         """Start both pipelines."""
  52.         self.pipeline1.set_state(Gst.State.PLAYING)
  53.         self.pipeline2.set_state(Gst.State.PLAYING)
  54.  
  55.     def stop(self):
  56.         """Stop both pipelines."""
  57.         self.pipeline1.set_state(Gst.State.NULL)
  58.         self.pipeline2.set_state(Gst.State.NULL)
  59.  
  60. def main():
  61.     gst_relay = GstRelay()
  62.     gst_relay.start()
  63.  
  64.     loop = GLib.MainLoop()
  65.     try:
  66.         loop.run()
  67.     except KeyboardInterrupt:
  68.         print("Stopping pipelines...")
  69.         gst_relay.stop()
  70.  
  71. if __name__ == "__main__":
  72.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement