Advertisement
Guest User

Untitled

a guest
May 21st, 2018
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.83 KB | None | 0 0
  1. import peewee
  2. from requests import get, head
  3. from json import loads
  4. from time import sleep, perf_counter
  5. from io import BytesIO
  6. from PIL import Image
  7. from dateutil.parser import parse
  8.  
  9. database = peewee.SqliteDatabase('lastfm.db')
  10.  
  11.  
  12. class Artists(peewee.Model):
  13.     name = peewee.CharField(unique=True)
  14.     image = peewee.BlobField(null=True)
  15.  
  16.     class Meta:
  17.         database = database
  18.  
  19.  
  20. class Albums(peewee.Model):
  21.     artist = peewee.ForeignKeyField(Artists)
  22.     title = peewee.CharField()
  23.     release_date = peewee.IntegerField()
  24.     label = peewee.CharField()
  25.     cover = peewee.BlobField(null=True)
  26.  
  27.     class Meta:
  28.         database = database
  29.  
  30.  
  31. class Tracks(peewee.Model):
  32.     album = peewee.ForeignKeyField(Albums)
  33.     position = peewee.IntegerField()
  34.     title = peewee.CharField()
  35.     duration = peewee.IntegerField()
  36.  
  37.     class Meta:
  38.         database = database
  39.  
  40.  
  41. api_key = '665896559612c89558eb569d04fb7423'
  42. user = 'campoviy'
  43.  
  44.  
  45. def converttojpeg(image_data, image_format):
  46.     image = Image.open(BytesIO(image_data))
  47.     with BytesIO() as f:
  48.         try:
  49.             if image_format == 'png':
  50.                 result = Image.new('RGB', image.size, (255, 255, 255))
  51.                 result.paste(image, mask=image.split()[3])
  52.                 result.save(f, format='JPEG', subsampling=0, quality=80)
  53.                 return f.getvalue()
  54.             elif image_format == 'jpg':
  55.                 image = image.resize((300, 300), Image.ANTIALIAS)
  56.                 image.save(f, format='JPEG', subsampling=0, quality=80)
  57.                 return f.getvalue()
  58.         except:
  59.             return
  60.  
  61.  
  62. # получение mbid артиста по имени
  63. def musicbrainz_get_artist_id(artist):
  64.     sleep(1)
  65.     try:
  66.         return loads(get('http://musicbrainz.org/ws/2/artist',
  67.                          params={'query': '"' + artist + '"', 'limit': 1, 'fmt': 'json'}).text)['artists'][0]['id']
  68.     except (KeyError, IndexError, TypeError, ValueError):  # в таком написании артиста нет в musicbrainz
  69.         return
  70.  
  71.  
  72. # получение mbid релиза по ID артиста и названию релиза
  73. def musicbrainz_search_release(artist, release_name):
  74.     sleep(1)
  75.     artist_id = musicbrainz_get_artist_id(artist)
  76.     if artist_id is None:  # если артист не нашёлся вышеописанным методом
  77.         return
  78.     try:
  79.         return loads(get('http://musicbrainz.org/ws/2/release',
  80.                          params={'query': '"' + release_name + '"' + ' AND arid:' + artist_id,
  81.                                  'limit': 1, 'fmt': 'json'}).text)['releases'][0]['id']
  82.     except (KeyError, IndexError, TypeError, ValueError):
  83.         pass
  84.  
  85.  
  86. # детали релиза по его mbid
  87. def musicbrainz_release_details(mbid_release):
  88.     sleep(1)
  89.     return loads(get('http://musicbrainz.org/ws/2/release/' + mbid_release,
  90.                      params={'inc': 'artist-credits+labels+recordings', 'fmt': 'json'}).text)
  91.  
  92.  
  93. # по умолчанию - получение обложки с Coverartarchive,
  94. # но по требованию - и с lastfm
  95. def get_coverart(data, lastfm=False):
  96.     if lastfm:  # режим получения обложки с lastfm
  97.         try:
  98.             for image in data['image']:
  99.                 if image['size'] == 'extralarge' and image['#text'] != '':
  100.                     return image['#text'], str(int(int(head(image['#text']).headers['Content-Length']) / 1024))
  101.         except (KeyError, IndexError, TypeError, ValueError):
  102.             return
  103.     try:
  104.         url = head(head('http://coverartarchive.org/release/' + data + '/front')
  105.                    .headers['Location']).headers['Location']
  106.         return url, str(int(int(head(url).headers['Content-Length']) / 1024))
  107.     except (KeyError, IndexError, TypeError, ValueError):
  108.         return
  109.  
  110.  
  111. # получение страницы топовых альбомов пользователя
  112. def gettopalbums(page):
  113.     return loads(get('http://ws.audioscrobbler.com/2.0/',
  114.                      params={'format': 'json', 'method': 'user.gettopalbums',
  115.                              'api_key': api_key, 'user': user, 'period': 'overall', 'limit': '1000',
  116.                              'page': page}).text)
  117.  
  118.  
  119. # детали релиза в lastfm
  120. def albumgetinfo(mbid_lastfm):
  121.     return loads(get('http://ws.audioscrobbler.com/2.0/',
  122.                      params={'format': 'json', 'method': 'album.getInfo',
  123.                              'api_key': api_key, 'user': user, 'mbid': mbid_lastfm}).text)
  124.  
  125.  
  126. # детали артиста в lastfm
  127. def artistgetinfo(mbid_artist):
  128.     return loads(get('http://ws.audioscrobbler.com/2.0/',
  129.                      params={'format': 'json', 'method': 'artist.getInfo',
  130.                              'api_key': api_key, 'user': user, 'mbid': mbid_artist}).text)
  131.  
  132.  
  133. def albumworker(lastfm_info):
  134.     time1 = perf_counter()
  135.     try:  # если альбома нашёлся, не надо его дублировать
  136.         Albums.select().join(Artists).where(
  137.             (Albums.title == lastfm_info['name']) & (Artists.name == lastfm_info['artist']['name'])).get()
  138.         print('Album already exist!' + ' ' + str(perf_counter() - time1))
  139.         return
  140.     except Albums.DoesNotExist:  # значит, альбома нет, и мы просто продолжаем дальше
  141.         pass
  142.     search_release = musicbrainz_search_release(lastfm_info['artist']['name'], lastfm_info['name'])
  143.     if search_release is None:
  144.         return
  145.     try:
  146.         artist_image = get_coverart(artistgetinfo(lastfm_info['artist']['mbid'])['artist'], True)
  147.     except (KeyError, IndexError, TypeError, ValueError):
  148.         artist_image = None
  149.     cover = get_coverart(lastfm_info, True)  # альтернативный mbid [от lastfm]
  150.     musicbrainz_release = musicbrainz_release_details(search_release)
  151.     if cover is None:  # обложка не была получена с lastfm
  152.         if musicbrainz_release['cover-art-archive']['front'] is True:
  153.             cover = get_coverart(search_release)
  154.     try:
  155.         date = parse(musicbrainz_release['date']).year
  156.         if not isinstance(date, int):
  157.             return
  158.     except (KeyError, IndexError, TypeError, ValueError):
  159.         return
  160.     try:
  161.         label = musicbrainz_release['label-info'][0]['label']['name']
  162.         if label == '':
  163.             label = 'Not on Label'
  164.     except (KeyError, IndexError, TypeError, ValueError):
  165.         label = 'Not on Label'
  166.     if cover is not None:
  167.         albuminfo = lastfm_info['artist']['name'] + ' - ' + lastfm_info['name'] + ' [' + str(
  168.             date) + '] Label: ' + label + '. Cover: ' + cover[1] + ' kB. URL: ' + cover[0]
  169.     else:
  170.         albuminfo = lastfm_info['artist']['name'] + ' - ' + lastfm_info['name'] + ' [' + str(
  171.             date) + '] Label: ' + label + '.'
  172.     try:
  173.         artist = Artists.select().where(Artists.name == lastfm_info['artist']['name']).get()
  174.     except Artists.DoesNotExist:
  175.         if artist_image is not None:
  176.             Artists.create(name=lastfm_info['artist']['name'],
  177.                            image=converttojpeg(get(artist_image[0]).content, artist_image[0][-3:]))
  178.         else:
  179.             Artists.create(name=lastfm_info['artist']['name'])
  180.         artist = Artists.select().where(Artists.name == lastfm_info['artist']['name']).get()
  181.     if cover is not None:
  182.         Albums.create(artist=artist, title=lastfm_info['name'], release_date=date, label=label,
  183.                       cover=converttojpeg(get(cover[0]).content, cover[0][-3:]))
  184.         album = Albums.select().join(Artists).where(
  185.             (Albums.title == lastfm_info['name']) & (Artists.name == artist.name)).get()
  186.     else:
  187.         Albums.create(artist=artist, title=lastfm_info['name'], release_date=date, label=label)
  188.         album = Albums.select().join(Artists).where(
  189.             (Albums.title == lastfm_info['name']) & (Artists.name == artist.name)).get()
  190.     try:
  191.         tup = ()
  192.         dicts = []  # insert_many ожидает iterable of dictonaries
  193.         for tracks in musicbrainz_release['media'][0]['tracks']:
  194.             # length = tracks['recording']['length']
  195.             length = tracks['recording']['length'] if tracks['recording']['length'] is not None else '0'
  196.             curr_track = tracks['number']
  197.             try:
  198.                 curr_track = int(curr_track)
  199.             except ValueError:  # виниловые номера треков, например, 'A1'
  200.                 replaced = ''
  201.                 for ch in curr_track:
  202.                     if ch.isalpha():
  203.                         replaced += str(ord(ch))
  204.                 curr_track = int(replaced)
  205.             tup += ((curr_track, tracks['recording']['title'], str(int(int(length) / 1000))),)
  206.             dicts.append({'album': album, 'position': curr_track, 'title': tracks['recording']['title'],
  207.                           'duration': int(int(length) / 1000)})
  208.         Tracks.insert_many(dicts).execute()
  209.     except (KeyError, IndexError, TypeError, ValueError):
  210.         print(musicbrainz_release)
  211.         return
  212.     return str(round(perf_counter() - time1, 3)), albuminfo, tup
  213.  
  214.  
  215. if __name__ == '__main__':
  216.     try:
  217.         Artists.create_table()
  218.     except peewee.OperationalError:
  219.         print('Artists table already exists!')
  220.     try:
  221.         Albums.create_table()
  222.     except peewee.OperationalError:
  223.         print('Albums table already exists!')
  224.     try:
  225.         Tracks.create_table()
  226.     except peewee.OperationalError:
  227.         print('Tracks table already exists!')
  228.     for page in range(1, int(gettopalbums(1)['topalbums']['@attr']['totalPages']) + 1):
  229.         for each_album in gettopalbums(page)['topalbums']['album']:
  230.             func = albumworker(each_album)
  231.             if func is not None:
  232.                 print(func)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement