SHARE
TWEET

Untitled

a guest May 23rd, 2019 75 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import gi
  2.  
  3. gi.require_version("Gst", "1.0")
  4. from gi.repository import Gst, GstApp, GObject
  5.  
  6. Gst.init(None)
  7.  
  8. class CameraPipeline:
  9.     def __init__(self):
  10.         self.mainloop = GObject.MainLoop()
  11.         self.pipeline = Gst.Pipeline()
  12.         # Forward messages from the bins
  13.         self.pipeline.set_property("message-forward", True)
  14.         self.bus = self.pipeline.get_bus()
  15.         self.bus.add_signal_watch()
  16.         self.bus.connect("message::error", self.on_error)
  17.  
  18.         # Src
  19.         self.src = Gst.ElementFactory.make("videotestsrc")
  20.  
  21.         # Appsink
  22.         self.appsink = Gst.ElementFactory.make("appsink")
  23.         self.appsink.set_property("emit-signals", True)
  24.         self.appsink.connect("new-sample", self.on_new_src_buffer)
  25.  
  26.         # Appsrc
  27.         self.metadata_src = Gst.ElementFactory.make("appsrc")
  28.  
  29.         # Appsink
  30.         self.metadata_sink = Gst.ElementFactory.make("appsink")
  31.         self.metadata_sink.set_property("emit-signals", True)
  32.         self.metadata_sink.connect("new-sample", self.on_new_metadata_buffer)
  33.  
  34.         self.pipeline.add(self.src)
  35.         self.pipeline.add(self.appsink)
  36.         self.pipeline.add(self.metadata_src)
  37.         self.pipeline.add(self.metadata_sink)
  38.  
  39.         self.src.link(self.appsink)
  40.         self.metadata_src.link(self.metadata_sink)
  41.  
  42.     def on_new_src_buffer(self, appsink):
  43.         sample = GstApp.AppSink.pull_sample(appsink)
  44.         GstApp.AppSrc.push_sample(self.metadata_src, sample)
  45.         print("Pushed to metadata_src")
  46.         return Gst.FlowReturn.OK
  47.  
  48.     def on_new_metadata_buffer(self, appsink):
  49.         print("Pulled in metadata_sink")
  50.         return Gst.FlowReturn.OK
  51.  
  52.     def on_error(self, bus, msg):
  53.         print("on_error():", msg.parse_error())
  54.  
  55.     def run(self):
  56.         self.pipeline.set_state(Gst.State.PLAYING)
  57.         self.mainloop.run()
  58.  
  59.  
  60. if __name__ == "__main__":
  61.     pipeline = CameraPipeline()
  62.     pipeline.run()
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top