Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import (
- absolute_import, division, print_function, unicode_literals)
- import collections
- import time
- import sys
- import gi
- gi.require_version('Gst', '1.0')
- gi.require_version('GstPbutils', '1.0')
- from gi.repository import Gst, GLib, GstPbutils
- _Result = collections.namedtuple(
- 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime'))
- class Signals(object):
- """Helper for tracking gobject signal registrations"""
- def __init__(self):
- self._ids = {}
- def connect(self, element, event, func, *args):
- """Connect a function + args to signal event on an element.
- Each event may only be handled by one callback in this implementation.
- """
- assert (element, event) not in self._ids
- self._ids[(element, event)] = element.connect(event, func, *args)
- def disconnect(self, element, event):
- """Disconnect whatever handler we have for an element+event pair.
- Does nothing it the handler has already been removed.
- """
- signal_id = self._ids.pop((element, event), None)
- if signal_id is not None:
- element.disconnect(signal_id)
- def clear(self):
- """Clear all registered signal handlers."""
- for element, event in self._ids.keys():
- element.disconnect(self._ids.pop((element, event)))
- class Scanner(object):
- def __init__(self, timeout=1000, proxy_config=None):
- self._timeout_ms = int(timeout)
- self._proxy_config = proxy_config or {}
- def scan(self, uri, timeout=None):
- timeout = int(timeout or self._timeout_ms)
- tags, duration, seekable, mime = None, None, None, None
- pipeline = _setup_pipeline(uri, self._proxy_config)
- _start_pipeline(pipeline)
- tags, mime = _process(pipeline, timeout)
- return _Result(uri, tags, duration, seekable, mime)
- def _setup_pipeline(uri, proxy_config=None):
- Gst.init([])
- print ("version: " + Gst.version_string())
- src = Gst.Element.make_from_uri(Gst.URIType.SRC, uri)
- if not src:
- print ("can't open file: " + uri)
- exit(0)
- typefind = Gst.ElementFactory.make('typefind')
- decodebin = Gst.ElementFactory.make('decodebin')
- pipeline = Gst.ElementFactory.make('pipeline')
- for e in (src, typefind, decodebin):
- pipeline.add(e)
- src.link(typefind)
- typefind.link(decodebin)
- signals = Signals()
- signals.connect(decodebin, 'pad-added', _pad_added, pipeline)
- return pipeline
- def _pad_added(element, pad, pipeline):
- sink = Gst.ElementFactory.make('fakesink')
- sink.set_property('sync', False)
- pipeline.add(sink)
- sink.sync_state_with_parent()
- pad.link(sink.get_static_pad('sink'))
- def _start_pipeline(pipeline):
- result = pipeline.set_state(Gst.State.PAUSED)
- if result == Gst.StateChangeReturn.NO_PREROLL:
- pipeline.set_state(Gst.State.PLAYING)
- def convert_taglist(taglist):
- result = collections.defaultdict(list)
- for n in range(taglist.n_tags()):
- tag = taglist.nth_tag_name(n)
- for i in range(taglist.get_tag_size(tag)):
- value = taglist.get_value_index(tag, i)
- if isinstance(value, GLib.Date):
- date = datetime.date(
- value.get_year(), value.get_month(), value.get_day())
- result[tag].append(date.isoformat().decode('utf-8'))
- if isinstance(value, Gst.DateTime):
- result[tag].append(value.to_iso8601_string().decode('utf-8'))
- elif isinstance(value, bytes):
- result[tag].append(value.decode('utf-8', 'replace'))
- else:
- pass
- return result
- def _process(pipeline, timeout_ms):
- bus = pipeline.get_bus()
- tags = {}
- mime = None
- missing_message = None
- types = (
- Gst.MessageType.ELEMENT |
- Gst.MessageType.APPLICATION |
- Gst.MessageType.ERROR |
- Gst.MessageType.EOS |
- Gst.MessageType.ASYNC_DONE |
- Gst.MessageType.TAG
- )
- timeout = timeout_ms
- previous = int(time.time() * 1000)
- while timeout > 0:
- message = bus.timed_pop_filtered(timeout * Gst.MSECOND, types)
- if message is None:
- break
- print (message.type)
- if message.type == Gst.MessageType.APPLICATION:
- if message.get_structure().get_name() == 'have-type':
- mime = message.get_structure().get_value('caps').get_name()
- if mime and (
- mime.startswith('text/') or mime == 'application/xml'):
- return tags, mime, have_audio
- elif message.type == Gst.MessageType.ERROR:
- if missing_message and not mime:
- caps = missing_message.get_structure().get_value('detail')
- mime = caps.get_structure(0).get_name()
- err, debug = message.parse_error()
- print("Error received from element %s: %s" % (
- message.src.get_name(), err))
- print("Debugging information: %s" % debug)
- return tags, mime
- elif message.type == Gst.MessageType.EOS:
- return tags, mime
- elif message.type == Gst.MessageType.ASYNC_DONE:
- return tags, mime
- elif message.type == Gst.MessageType.TAG:
- taglist = message.parse_tag()
- # Note that this will only keep the last tag.
- tags.update(convert_taglist(taglist))
- now = int(time.time() * 1000)
- timeout -= now - previous
- previous = now
- s = Scanner()
- result = s.scan("file://" + sys.argv[1])
- for key in ('uri', 'mime', 'duration', 'seekable'):
- print('%-20s %s' % (key, getattr(result, key)))
- print('tags')
- for tag, value in result.tags.items():
- print('%-20s %s' % (tag, value))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement