Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import peewee
- from requests import get, head
- from json import loads
- from time import sleep, perf_counter
- from io import BytesIO
- from PIL import Image
- from dateutil.parser import parse
- database = peewee.SqliteDatabase('lastfm.db')
- class Artists(peewee.Model):
- name = peewee.CharField(unique=True)
- image = peewee.BlobField(null=True)
- class Meta:
- database = database
- class Albums(peewee.Model):
- artist = peewee.ForeignKeyField(Artists)
- title = peewee.CharField()
- release_date = peewee.IntegerField()
- label = peewee.CharField()
- cover = peewee.BlobField(null=True)
- class Meta:
- database = database
- class Tracks(peewee.Model):
- album = peewee.ForeignKeyField(Albums)
- position = peewee.IntegerField()
- title = peewee.CharField()
- duration = peewee.IntegerField()
- class Meta:
- database = database
- api_key = '665896559612c89558eb569d04fb7423'
- user = 'campoviy'
- def converttojpeg(image_data, image_format):
- image = Image.open(BytesIO(image_data))
- with BytesIO() as f:
- try:
- if image_format == 'png':
- result = Image.new('RGB', image.size, (255, 255, 255))
- result.paste(image, mask=image.split()[3])
- result.save(f, format='JPEG', subsampling=0, quality=80)
- return f.getvalue()
- elif image_format == 'jpg':
- image = image.resize((300, 300), Image.ANTIALIAS)
- image.save(f, format='JPEG', subsampling=0, quality=80)
- return f.getvalue()
- except:
- return
- # получение mbid артиста по имени
- def musicbrainz_get_artist_id(artist):
- sleep(1)
- try:
- return loads(get('http://musicbrainz.org/ws/2/artist',
- params={'query': '"' + artist + '"', 'limit': 1, 'fmt': 'json'}).text)['artists'][0]['id']
- except (KeyError, IndexError, TypeError, ValueError): # в таком написании артиста нет в musicbrainz
- return
- # получение mbid релиза по ID артиста и названию релиза
- def musicbrainz_search_release(artist, release_name):
- sleep(1)
- artist_id = musicbrainz_get_artist_id(artist)
- if artist_id is None: # если артист не нашёлся вышеописанным методом
- return
- try:
- return loads(get('http://musicbrainz.org/ws/2/release',
- params={'query': '"' + release_name + '"' + ' AND arid:' + artist_id,
- 'limit': 1, 'fmt': 'json'}).text)['releases'][0]['id']
- except (KeyError, IndexError, TypeError, ValueError):
- pass
- # детали релиза по его mbid
- def musicbrainz_release_details(mbid_release):
- sleep(1)
- return loads(get('http://musicbrainz.org/ws/2/release/' + mbid_release,
- params={'inc': 'artist-credits+labels+recordings', 'fmt': 'json'}).text)
- # по умолчанию - получение обложки с Coverartarchive,
- # но по требованию - и с lastfm
- def get_coverart(data, lastfm=False):
- if lastfm: # режим получения обложки с lastfm
- try:
- for image in data['image']:
- if image['size'] == 'extralarge' and image['#text'] != '':
- return image['#text'], str(int(int(head(image['#text']).headers['Content-Length']) / 1024))
- except (KeyError, IndexError, TypeError, ValueError):
- return
- try:
- url = head(head('http://coverartarchive.org/release/' + data + '/front')
- .headers['Location']).headers['Location']
- return url, str(int(int(head(url).headers['Content-Length']) / 1024))
- except (KeyError, IndexError, TypeError, ValueError):
- return
- # получение страницы топовых альбомов пользователя
- def gettopalbums(page):
- return loads(get('http://ws.audioscrobbler.com/2.0/',
- params={'format': 'json', 'method': 'user.gettopalbums',
- 'api_key': api_key, 'user': user, 'period': 'overall', 'limit': '1000',
- 'page': page}).text)
- # детали релиза в lastfm
- def albumgetinfo(mbid_lastfm):
- return loads(get('http://ws.audioscrobbler.com/2.0/',
- params={'format': 'json', 'method': 'album.getInfo',
- 'api_key': api_key, 'user': user, 'mbid': mbid_lastfm}).text)
- # детали артиста в lastfm
- def artistgetinfo(mbid_artist):
- return loads(get('http://ws.audioscrobbler.com/2.0/',
- params={'format': 'json', 'method': 'artist.getInfo',
- 'api_key': api_key, 'user': user, 'mbid': mbid_artist}).text)
- def albumworker(lastfm_info):
- time1 = perf_counter()
- try: # если альбома нашёлся, не надо его дублировать
- Albums.select().join(Artists).where(
- (Albums.title == lastfm_info['name']) & (Artists.name == lastfm_info['artist']['name'])).get()
- print('Album already exist!' + ' ' + str(perf_counter() - time1))
- return
- except Albums.DoesNotExist: # значит, альбома нет, и мы просто продолжаем дальше
- pass
- search_release = musicbrainz_search_release(lastfm_info['artist']['name'], lastfm_info['name'])
- if search_release is None:
- return
- try:
- artist_image = get_coverart(artistgetinfo(lastfm_info['artist']['mbid'])['artist'], True)
- except (KeyError, IndexError, TypeError, ValueError):
- artist_image = None
- cover = get_coverart(lastfm_info, True) # альтернативный mbid [от lastfm]
- musicbrainz_release = musicbrainz_release_details(search_release)
- if cover is None: # обложка не была получена с lastfm
- if musicbrainz_release['cover-art-archive']['front'] is True:
- cover = get_coverart(search_release)
- try:
- date = parse(musicbrainz_release['date']).year
- if not isinstance(date, int):
- return
- except (KeyError, IndexError, TypeError, ValueError):
- return
- try:
- label = musicbrainz_release['label-info'][0]['label']['name']
- if label == '':
- label = 'Not on Label'
- except (KeyError, IndexError, TypeError, ValueError):
- label = 'Not on Label'
- if cover is not None:
- albuminfo = lastfm_info['artist']['name'] + ' - ' + lastfm_info['name'] + ' [' + str(
- date) + '] Label: ' + label + '. Cover: ' + cover[1] + ' kB. URL: ' + cover[0]
- else:
- albuminfo = lastfm_info['artist']['name'] + ' - ' + lastfm_info['name'] + ' [' + str(
- date) + '] Label: ' + label + '.'
- try:
- artist = Artists.select().where(Artists.name == lastfm_info['artist']['name']).get()
- except Artists.DoesNotExist:
- if artist_image is not None:
- Artists.create(name=lastfm_info['artist']['name'],
- image=converttojpeg(get(artist_image[0]).content, artist_image[0][-3:]))
- else:
- Artists.create(name=lastfm_info['artist']['name'])
- artist = Artists.select().where(Artists.name == lastfm_info['artist']['name']).get()
- if cover is not None:
- Albums.create(artist=artist, title=lastfm_info['name'], release_date=date, label=label,
- cover=converttojpeg(get(cover[0]).content, cover[0][-3:]))
- album = Albums.select().join(Artists).where(
- (Albums.title == lastfm_info['name']) & (Artists.name == artist.name)).get()
- else:
- Albums.create(artist=artist, title=lastfm_info['name'], release_date=date, label=label)
- album = Albums.select().join(Artists).where(
- (Albums.title == lastfm_info['name']) & (Artists.name == artist.name)).get()
- try:
- tup = ()
- dicts = [] # insert_many ожидает iterable of dictonaries
- for tracks in musicbrainz_release['media'][0]['tracks']:
- # length = tracks['recording']['length']
- length = tracks['recording']['length'] if tracks['recording']['length'] is not None else '0'
- curr_track = tracks['number']
- try:
- curr_track = int(curr_track)
- except ValueError: # виниловые номера треков, например, 'A1'
- replaced = ''
- for ch in curr_track:
- if ch.isalpha():
- replaced += str(ord(ch))
- curr_track = int(replaced)
- tup += ((curr_track, tracks['recording']['title'], str(int(int(length) / 1000))),)
- dicts.append({'album': album, 'position': curr_track, 'title': tracks['recording']['title'],
- 'duration': int(int(length) / 1000)})
- Tracks.insert_many(dicts).execute()
- except (KeyError, IndexError, TypeError, ValueError):
- print(musicbrainz_release)
- return
- return str(round(perf_counter() - time1, 3)), albuminfo, tup
- if __name__ == '__main__':
- try:
- Artists.create_table()
- except peewee.OperationalError:
- print('Artists table already exists!')
- try:
- Albums.create_table()
- except peewee.OperationalError:
- print('Albums table already exists!')
- try:
- Tracks.create_table()
- except peewee.OperationalError:
- print('Tracks table already exists!')
- for page in range(1, int(gettopalbums(1)['topalbums']['@attr']['totalPages']) + 1):
- for each_album in gettopalbums(page)['topalbums']['album']:
- func = albumworker(each_album)
- if func is not None:
- print(func)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement