Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import gi
- gi.require_version('GObject', '2.0')
- gi.require_version('Gst', '1.0')
- gi.require_version('GstApp', '1.0')
- gi.require_version('GstPbutils', '1.0')
- from gi.repository import GObject, Gst, GstApp, GstPbutils
- from os import walk
- from os.path import abspath
- from datetime import timedelta
- from os import putenv
- putenv('GST_DEBUG', '2')
- GObject.threads_init()
- Gst.init(None)
- class Importer:
- def __init__(self, tracks, album_covers, path_list):
- self.tracks = tracks
- self.album_covers = album_covers
- self.path_list = path_list
- self.file_list = []
- self.mainloop = GObject.MainLoop()
- self.discoverer = GstPbutils.Discoverer()
- self.discoverer.set_property('timeout', 5 * Gst.SECOND)
- self.discoverer.connect('discovered', self.parse_results)
- self.discoverer.connect('finished', self.on_finished)
- self.image_pipeline = Gst.Pipeline()
- self.appsrc = Gst.ElementFactory.make('appsrc', None)
- encoder = Gst.ElementFactory.make('pngenc', None)
- filesink = Gst.ElementFactory.make('filesink', None)
- self.image_pipeline.add(self.appsrc)
- self.image_pipeline.add(encoder)
- self.image_pipeline.add(filesink)
- self.appsrc.set_property('stream-type', GstApp.AppStreamType.RANDOM_ACCESS)
- self.appsrc.connect('seek-data', self.image_pipeline_seek)
- filesink.set_property('location', '/tmp/test.png')
- self.appsrc.link(encoder)
- encoder.link(filesink)
- def get_file_list(self):
- for top in self.path_list:
- for root, dirs, files in walk(top):
- for f in files:
- self.file_list.append(abspath(root + '/' + f))
- def on_finished(self, discoverer, *user_data):
- self.stop()
- def parse_results(self, discoverer, info, error, *user_data):
- result = info.get_result()
- uri = info.get_uri()
- tags = info.get_tags()
- duration = info.get_duration()
- if result == GstPbutils.DiscovererResult.OK:
- track = {}
- track['uri'] = uri
- track['duration'] = duration
- for tag in ['title', 'artist', 'album', 'genre']:
- exists, value = tags.get_string(tag)
- if exists:
- track[tag] = value
- exists, sample = tags.get_sample('image')
- if exists:
- source_buffer = sample.get_buffer()
- self.appsrc.set_property('caps', sample.get_caps())
- self.appsrc.set_property('size', source_buffer.get_size())
- self.appsrc.connect('need-data', self.push_image, source_buffer)
- self.image_pipeline.set_state(Gst.State.PLAYING)
- self.tracks.insert(track)
- else:
- # TODO: Do something about the various cases here.
- pass
- def push_image(self, appsrc, size, source_buffer):
- print('pushing image...')
- self.appsrc.emit('push-buffer', source_buffer)
- self.appsrc.disconnect('need-data')
- self.appsrc.emit('end-of-stream')
- self.image_pipeline.set_state(Gst.State.NULL)
- print('...done')
- def image_pipeline_seek(self, appsrc, offset):
- if offset == 0:
- return True
- else:
- return False
- def start(self):
- self.discoverer.start()
- for f in self.file_list:
- self.discoverer.discover_uri_async(Gst.filename_to_uri(f))
- print(f)
- self.mainloop.run()
- def stop(self):
- self.discoverer.stop()
- self.mainloop.quit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement