Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import os, re, base64, time, sys, argparse, itertools
- import urllib.request as request
- from mutagen.oggvorbis import OggVorbis, OggVorbisHeaderError
- from mutagen.flac import Picture
- import youtube_dl
- from PIL import Image
- from io import BytesIO
- parser = argparse.ArgumentParser(description='Downloads songs from youtube videos and tags them based on description and metadata')
- parser.add_argument('videos', metavar='video', nargs='*', help='A video to download and tag. Use youtube id or url')
- #parser.add_argument('-s', '--stdin', action='store_true', help='Read videos from stdin in addition to arguments')
- parser.add_argument('-d', '--directory', default='/home/zenith/Music/Library/ytdl/others', help='Directory to download songs to. Defaults to /home/zenith/Music/Library/ytdl/others')
- parser.add_argument('-n', '--no-tag', action='store_true', help='Don\'t tag, just download')
- parser.add_argument('-o', '--allow-overwrite', action='store_true', help='Allow downloaded file to overwrite existing file')
- parser.add_argument('-c', '--description', action='store_true', help='Download description')
- args = parser.parse_args()
- kvPattern = re.compile(r'\b((?:\w+\s+)*\w+)\s*(?::|:)\s*([^\s].*)$', re.UNICODE)
- kvPattern2 = re.compile(r'\b\[((?:\w+\s+)*\w+)\]\s*(\w+(?:\s+\w+)*)$', re.UNICODE)
- titlePattern = re.compile(r'^(.*?)\s+\-\s+(.*)$', re.UNICODE)
- tagdict = {
- 'bpm' : 'BPM',
- 'artist' : 'ARTIST',
- 'song' : 'TITLE',
- 'title' : 'TITLE',
- 'track' : 'TITLE',
- 'genre' : 'GENRE',
- 'language' : 'LANGUAGE',
- 'album' : 'ALBUM',
- 'disc' : 'ALBUM',
- 'website' : 'WEBSITE',
- 'site' : 'WEBSITE',
- 'from' : 'ORIGINALALBUM',
- 'original' : 'ORIGINAL',
- 'source' : 'ORIGINAL',
- 'key' : 'KEY',
- 'arrangement': 'ARRANGER',
- 'arranger' : 'ARRANGER',
- 'arrange' : 'ARRANGER',
- 'lyric' : 'LYRICIST',
- 'lyrics' : 'LYRICIST',
- 'lyricist' : 'LYRICIST',
- }
- def tag(vorbis, information, log=None):
- title = information['title']
- description = information['description']
- channel = information['uploader']
- video_id = information['id']
- if log is None:
- log = lambda x:None
- lines = description.split('\n')
- tags = {}
- for tag, value in (match.group(1,2) for match in (re.search(kvPattern, line) or re.search(kvPattern2, line) for line in lines) if match):
- tag = tag.lower()
- if tag in tagdict:
- tags[tagdict[tag]] = value
- if tag == 'circle' or tag == 'group':
- if 'artist' not in tags:
- tags['ARTIST'] = value
- continue
- if tag == 'singer' or tag == 'vocals' or tag == 'vocalist' or tag == 'vocal':
- tags['VOCALIST'] = value
- tags['VOCAL'] = 'Yes'
- log('Added tags from description')
- #print(title)
- titleMatch = re.match(titlePattern, title) or re.match(kvPattern2, title)
- if titleMatch:
- if 'ARTIST' not in tags:
- tags['ARTIST'] = titleMatch.group(1).strip()
- if 'TITLE' not in tags:
- tags['TITLE'] = titleMatch.group(2).strip()
- else:
- if 'TITLE' not in tags:
- tags['TITLE'] = title
- if 'ARTIST' not in tags:
- tags['ARTIST'] = channel
- if 'vocal' in title.lower():
- tags['VOCAL'] = 'Yes'
- tags['UPLOADER'] = channel
- tags['VIDEO_ID'] = video_id
- if 'webpage_url' in information:
- tags['VIDEO_WEBPAGE'] = information['webpage_url']
- log('Added tags from title and channel')
- lyrics=re.search(r'lyrics:(.*?)(?:\n\n|$)', description, re.UNICODE + re.IGNORECASE)
- if lyrics:
- tags['LYRICS'] = lyrics.group(1)
- tags['VOCAL'] = 'Yes'
- if 'thumbnail' in information:
- try:
- picture = Picture()
- picture.type = 3
- with request.urlopen(information['thumbnail']) as thumbhttp:
- picture.mime = thumbhttp.headers.get_content_type()
- thumb_data = BytesIO(thumbhttp.read())
- thumb_image = Image.open(thumb_data)
- picture.width = thumb_image.width
- picture.height = thumb_image.height
- picture.data = thumb_data.getvalue()
- tags['METADATA_BLOCK_PICTURE'] = base64.b64encode(picture.write()).decode('ascii')
- log('Added thumbnail image')
- except request.HTTPError:
- pass
- vorbis.clear()
- for tag in vorbis:
- del vorbis[tag]
- log('Removed existing tags')
- for tag, value in tags.items():
- vorbis[tag] = value
- vorbis.pprint()
- vorbis.save()
- log('Saved tags to file')
- class Tagger(youtube_dl.postprocessor.common.PostProcessor):
- def run(self, information):
- log = lambda message:self._downloader.to_screen('[tagger] ' + message)
- filename = information['filepath']
- if not filename.endswith('.ogg'):
- return [], information
- log('Tagging {0}'.format(filename))
- try:
- vorbis = OggVorbis(filename)
- except OggVorbisHeaderError:
- log('Unable to read. Renaming and Skipping.')
- os.rename(filename, '.'+filename)
- information['filepath'] = '.'+filename
- else:
- description = information['description']
- uploader = information['uploader']
- title = information['title']
- video_id = information['id']
- tag(vorbis, information, log)
- log('Successfully tagged')
- return [], information
- class CollisionError(Exception):
- pass
- class AntiOverwrite(youtube_dl.postprocessor.common.PostProcessor):
- def run(self, information):
- oggfilename = re.sub(r'\..*$', '.ogg', information['filepath'])
- if os.path.exists(oggfilename):
- self._downloader.to_screen('[collision_detector] File exists, skipping: {0}'.format(oggfilename))
- os.remove(information['filepath'])
- raise CollisionError()
- return [], information
- outtmpl = '%(uploader)s - %(title)s [%(id)s].%(ext)s'
- if args.directory:
- if not os.path.isdir(args.directory):
- parser.error('Not a directory: {0}'.format(args.directory))
- outtmpl = os.path.join(args.directory, outtmpl)
- youtube_dl_options = {
- 'format': 'bestaudio/best',
- 'outtmpl': outtmpl,
- 'writedescription': args.description,
- 'logtostderr': True,
- 'nooverwrites': not args.allow_overwrite
- }
- videos = args.videos or (video.strip() for video in sys.stdin)
- with youtube_dl.YoutubeDL(youtube_dl_options) as downloader:
- #if not args.allow_overwrite:
- #downloader.add_post_processor(AntiOverwrite())
- downloader.add_post_processor(youtube_dl.postprocessor.ffmpeg.FFmpegExtractAudioPP(downloader, preferredcodec='vorbis', preferredquality='0', nopostoverwrites=not args.allow_overwrite))
- if not args.no_tag:
- downloader.add_post_processor(Tagger())
- for video in videos:
- try:
- downloader.download([video])
- except CollisionError:
- pass
- except Exception as ex:
- print('Error occured downloading video {0}: {1}'.format(video, ex), file=sys.stderr)
- print('Finished video: {0}'.format(video))
Add Comment
Please, Sign In to add comment