Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import gi
- gi.require_version("Gst", "1.0")
- from gi.repository import Gst, GstApp, GObject
- Gst.init(None)
- class CameraPipeline:
- def __init__(self):
- self.mainloop = GObject.MainLoop()
- self.pipeline = Gst.Pipeline()
- # Forward messages from the bins
- self.pipeline.set_property("message-forward", True)
- self.bus = self.pipeline.get_bus()
- self.bus.add_signal_watch()
- self.bus.connect("message::error", self.on_error)
- # Src
- self.src = Gst.ElementFactory.make("videotestsrc")
- # Appsink
- self.appsink = Gst.ElementFactory.make("appsink")
- self.appsink.set_property("emit-signals", True)
- self.appsink.connect("new-sample", self.on_new_src_buffer)
- # Appsrc
- self.metadata_src = Gst.ElementFactory.make("appsrc")
- # Appsink
- self.metadata_sink = Gst.ElementFactory.make("appsink")
- self.metadata_sink.set_property("emit-signals", True)
- self.metadata_sink.connect("new-sample", self.on_new_metadata_buffer)
- self.pipeline.add(self.src)
- self.pipeline.add(self.appsink)
- self.pipeline.add(self.metadata_src)
- self.pipeline.add(self.metadata_sink)
- self.src.link(self.appsink)
- self.metadata_src.link(self.metadata_sink)
- def on_new_src_buffer(self, appsink):
- sample = GstApp.AppSink.pull_sample(appsink)
- GstApp.AppSrc.push_sample(self.metadata_src, sample)
- print("Pushed to metadata_src")
- return Gst.FlowReturn.OK
- def on_new_metadata_buffer(self, appsink):
- print("Pulled in metadata_sink")
- return Gst.FlowReturn.OK
- def on_error(self, bus, msg):
- print("on_error():", msg.parse_error())
- def run(self):
- self.pipeline.set_state(Gst.State.PLAYING)
- self.mainloop.run()
- if __name__ == "__main__":
- pipeline = CameraPipeline()
- pipeline.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement