Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import sys
- import gi
- gi.require_version('Gst', '1.0')
- from gi.repository import Gst
- import time
- import threading
- import os
- import stat
- def mkfifo(fifo):
- if os.path.exists(fifo):
- if stat.S_ISFIFO(os.stat(fifo).st_mode):
- pass
- else:
- os.remove(fifo)
- os.mkfifo(fifo)
- else:
- os.mkfifo(fifo)
- def control():
- global pads
- FIFO="/tmp/fifo"
- mkfifo(FIFO)
- idx = 0
- while True:
- with open(FIFO) as fifo:
- line=fifo.read().strip()
- if "=" not in line:
- print("Invalid line.")
- continue
- key = line.split("=")[0]
- val = line.split("=")[1]
- if key == "switch":
- idx = (idx+1) % len(pads)
- switch(idx)
- def switch(idx):
- global pads
- global pipeline
- global switcher
- newpad = switcher.get_static_pad(pads[idx])
- pipeline.set_state(Gst.State.PAUSED)
- switcher.set_property('active-pad', newpad)
- pipeline.set_state(Gst.State.PLAYING)
- if __name__ == "__main__":
- # initialization
- Gst.init(None)
- Gst.debug_set_active(True)
- Gst.debug_set_default_threshold(3)
- img = "multifilesrc name=img location=logo.png ! pngdec ! imagefreeze ! videobox border-alpha=0 top=-180 left=-10 ! mix."
- out = "x264enc ! mpegtsmux ! filesink location=out.mp4"
- out = "autovideosink sync=false "
- inputs = [
- "videotestsrc do-timestamp=true pattern=0 ! video/x-raw,width=800,height=600",
- "multifilesrc do-timestamp=true location=regret.png ! pngdec ! imagefreeze",
- "videotestsrc do-timestamp=true pattern=1 ! video/x-raw,width=800,height=600",
- "rtmpsrc do-timestamp=true name=rtmp location=rtmp://192.168.2.120:1935/test/movie ! flvdemux ! decodebin ! videoconvert",
- ]
- p = "videomixer name=mix ! videoconvert ! " + out + " ! in. ".join(inputs) + " ! in. input-selector sync-streams=true sync-mode=1 name=in ! mix. " + img
- print(p)
- pipeline = Gst.parse_launch (p)
- if pipeline == None:
- print ("Failed to create pipeline")
- sys.exit(0)
- # run
- switcher = pipeline.get_by_name('in')
- i = pipeline.get_by_name('img')
- #pads = [ p.get_name() for p in switch.pads if p.get_direction() == Gst.PadDirection.GST_PAD_SINK ]
- pads = [ p.get_name() for p in switcher.pads if p.get_name() != "src" ]
- pipeline.set_state(Gst.State.PLAYING)
- t = threading.Thread(target=control, daemon=True)
- t.start()
- while True:
- time.sleep(10)
- # cleanup
- pipeline.set_state(Gst.State.NULL)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement