Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from subprocess import check_output
- import json
- import requests
- from urllib.parse import urljoin
- from hashlib import md5
- from os import path
- import os
- import tempfile
- from urllib.request import urlretrieve
- import shutil
- from tqdm import tqdm
- def read_config():
- with open('config.json') as f:
- return json.load(f)
- CONFIG = read_config()
- def get_user_dir(name=None):
- CMD = 'xdg-user-dir'
- return check_output((CMD,) if name is None else (CMD, name))[:-1].decode()
- try:
- MUSIC_DIR = CONFIG['music_dir']
- except KeyError:
- MUSIC_DIR = get_user_dir('MUSIC')
- class RelativeSession(requests.Session):
- def __init__(self, base_url):
- super(RelativeSession, self).__init__()
- self.__base_url = base_url
- def request(self, method, url, **kwargs):
- url = urljoin(self.__base_url, url)
- return super(RelativeSession, self).request(method, url, **kwargs)
- SERVER_ADDRESS = CONFIG['server_address']
- SESSION = RelativeSession(SERVER_ADDRESS)
- def login():
- USER = CONFIG['user']
- PHONE = USER['phone']
- PASSWORD = USER['password']
- r = SESSION.get('/login/cellphone', params={
- 'phone': PHONE, 'password': PASSWORD
- })
- r.raise_for_status()
- return r.json()['account']['id']
- def get_playlists(uid):
- r = SESSION.get('/user/playlist', params={
- 'uid': uid
- })
- r.raise_for_status()
- o = r.json()
- return [pl['id'] for pl in o['playlist']]
- def get_playlist_detail(plid):
- r = SESSION.get('/playlist/detail', params={
- 'id': plid
- })
- r.raise_for_status()
- o = r.json()
- pl = o['playlist']
- return {
- 'name': pl['name'],
- 'tracks': [{
- 'id': track['id'],
- 'name': track['name']
- } for track in pl['tracks']]
- }
- def get_recommand_playlist():
- r = SESSION.get('/recommend/songs')
- r.raise_for_status()
- o = r.json()
- return {
- 'name': '每日歌曲推荐',
- 'tracks': [{
- 'id': track['id'],
- 'name': track['name']
- } for track in o['recommend']]
- }
- def get_song_download_infos(snids):
- r = SESSION.get('/song/url', params={
- 'id': ','.join(map(str, snids))
- })
- r.raise_for_status()
- o = r.json()
- return {
- d['id']: {
- 'url': d['url'],
- 'md5': d['md5'].lower(),
- 'type': d['type']
- } for d in o['data'] if d['url']
- }
- def filemd5(ph):
- m = md5()
- with open(ph, 'rb') as f:
- for chunk in iter(lambda: f.read(4096), b""):
- m.update(chunk)
- return m.hexdigest()
- def escape_filename(fn):
- return fn.replace('/', '\ufffd')
- class TqdmUpTo(tqdm):
- def update_to(self, b=1, bsize=1, tsize=None):
- if tsize is not None:
- self.total = tsize
- self.update(b * bsize - self.n)
- def main():
- uid = login()
- plids = get_playlists(uid)
- pls = [get_playlist_detail(plid) for plid in plids]
- pls.append(get_recommand_playlist())
- infos = get_song_download_infos(set([tr['id'] for pl in pls for tr in pl['tracks']]))
- for pl in pls:
- for idx, tr in enumerate(pl['tracks']):
- try:
- tr.update(infos[tr['id']])
- except KeyError:
- pass
- tr['track_no'] = idx + 1
- ps_plnames = [d for d in os.listdir(MUSIC_DIR) if path.isdir(path.join(MUSIC_DIR, d))]
- with tempfile.TemporaryDirectory() as tmpdir:
- ps_md5s = set()
- for ps_pl in ps_plnames:
- d = path.join(MUSIC_DIR, ps_pl)
- for ps_tr in [s for s in os.listdir(d) if path.isfile(path.join(d, s))]:
- p = path.join(d, ps_tr)
- md5 = filemd5(p)
- try:
- os.link(p, path.join(tmpdir, md5))
- except FileExistsError:
- pass
- ps_md5s.add(md5)
- nds = {}
- for pl in pls:
- for tr in pl['tracks']:
- try:
- md5 = tr['md5']
- except KeyError:
- pass
- else:
- if md5 not in ps_md5s:
- nds[md5] = {
- 'url': tr['url'],
- 'name': tr['name']
- }
- err = None
- try:
- for md5, info in tqdm(nds.items(), unit='song'):
- tqdm.write('DOWNLOADING: {}'.format(info['name']))
- ph = path.join(tmpdir, md5)
- with TqdmUpTo(unit='B', unit_scale=True, miniters=1, leave=False) as t:
- (fn, _) = urlretrieve(info['url'], reporthook=t.update_to)
- if filemd5(fn) != md5:
- raise ValueError('hash mismatch')
- os.rename(fn, ph)
- except (KeyboardInterrupt, ValueError) as e:
- err = e
- for ps_pl in ps_plnames:
- d = path.join(MUSIC_DIR, ps_pl)
- shutil.rmtree(d)
- for pl in pls:
- d = path.join(MUSIC_DIR, escape_filename(pl['name']))
- os.mkdir(d)
- for tr in pl['tracks']:
- if 'md5' not in tr:
- continue
- fn = '{:02} - {}.{}'.format(tr['track_no'], tr['name'], tr['type'])
- fn = escape_filename(fn)
- try:
- os.link(path.join(tmpdir, tr['md5']), path.join(d, fn))
- except FileNotFoundError as e:
- if err is None:
- raise
- if err is not None:
- raise err
- main()
Add Comment
Please, Sign In to add comment