Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Song(disnake.PCMVolumeTransformer):
- example_data = {
- "title": "",
- "requester": "",
- "artist": "",
- "trackid": 0,
- "albumid": 0,
- "image": ""
- }
- def __init__(self):
- self.queue = []
- def search_tracks(self, ctx, query):
- stats = True
- while stats:
- try:
- searcher = client.search(str(query))
- if not searcher.tracks: raise searchError("Not searched tracks")
- track_object = searcher.tracks['results'][0]
- stats = False
- text = None
- lyrics = track_object.get_supplement()
- if lyrics.lyrics: text = lyrics.lyrics.full_lyrics
- # with sqlite3.connect("database.db") as db:
- # cursor = db.cursor()
- # cursor.execute("INSERT INTO songs(name, requester, author, id, albumid, lyrics, guild, image) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", (track_object['title'],ctx.author.id,track_object['artists'][0]['name'],track_object['id'],track_object['albums'][0]["id"],text,ctx.guild.id,"https://" + track_object["og_image"].replace("%%", "1000x1000"),))
- self.queue.append({
- "title": track_object["title"],
- "requester": ctx.author.id,
- "artist": track_object['artists'][0]['name'],
- "trackid": track_object["id"],
- "albumid": track_object["albums"][0]["id"],
- "image": "https://" + track_object["og_image"].replace("%%", "1000x1000")
- })
- return {
- "id": track_object["id"],
- "title": track_object["title"],
- "artist": track_object["artists"][0]["name"],
- "albumid": track_object["albums"][0]["id"],
- "image": "https://" + track_object["og_image"].replace("%%", "1000x1000")
- }
- except NetworkError:
- pass
- async def download_track(self, ctx, track):
- stats = True
- while stats:
- try:
- client.tracks(track['id'])[0].download(f"songs/{track['id']}.mp3")
- stats = False
- except NetworkError:
- pass
- async def join_channel(self, ctx):
- voice = ctx.author.voice
- if not voice: return "notChannel"
- if voice:
- await voice.channel.connect()
- for mem in voice.channel.members:
- if mem.id == 1047125592220373075:
- await mem.edit(deafen=True)
- return None
- async def leave_channel(self, ctx):
- voice_state = ctx.guild.voice_client
- if not voice_state:
- return "notState"
- if voice_state.is_connected():
- await voice_state.disconnect()
- return None
- else:
- return "notState"
- async def stop(self, ctx):
- voice_state = ctx.guild.voice_client
- if not voice_state:
- return "notState"
- if voice_state.is_connected():
- await Song.leave_channel(ctx)
- return None
- else:
- return "notState"
- async def my_after(self, ctx):
- #os.remove(f"songs/{ctx.guild.id}.mp3")
- if not ctx.guild:
- return
- voice_state = ctx.guild.voice_client
- if os.path.isfile("songs/{}.mp3".format(queue[0]["trackid"])):
- os.remove("songs/{}.mp3".format(queue[0]["trackid"]))
- self.queue.pop(0)
- if voice_state and voice_state.is_connected():
- try:
- self.queue[0]
- except:
- pass
- else:
- await Song.play(ctx)
- def init_after(self, ctx):
- asyncio.run_coroutine_threadsafe(Song.my_after(self, ctx), bot.loop)
- async def play(self, ctx):
- voice_state = ctx.guild.voice_client
- if not voice_state:
- error = await Song.join_channel(self, ctx)
- if error:
- return error
- voice_state = ctx.guild.voice_client
- if voice_state.is_playing():
- return "alreadyPlay"
- np = self.queue[0]
- if not os.path.isfile(f"songs/{np['trackid']}.mp3"):
- await Song.download_track(self, ctx, {'id': np["trackid"]})
- voice_state.play(disnake.FFmpegPCMAudio(f"songs/{np['trackid']}.mp3"), after=lambda e: Song.init_after(self, ctx))
- voice_state.is_playing()
- embed = disnake.Embed(title=np["title"],color=0x228b22)
- embed.add_field(name=lang(ctx,"Главный автор:"),value=np["artist"])
- embed.add_field(name=lang(ctx,"Предложил:"),value=f"<@{np['requester']}>")
- embed.add_field(name=lang(ctx,"Источник:"),value="<:yandexMusic:1056924402790436934> Yandex Music\n")
- embed.add_field(name=lang(ctx,"Ссылка:"),value=f"[**{lang(ctx,'Это кликабельная ссылка!')}**](https://music.yandex.ru/album/{np['albumid']}/track/{np['trackid']})")
- embed.set_thumbnail(url=np['image'])
- comps = [
- disnake.ui.Button(emoji="⏹", style=disnake.ButtonStyle.danger, custom_id="stop"),
- disnake.ui.Button(emoji="⏮", style=disnake.ButtonStyle.blurple, custom_id="replay"),
- disnake.ui.Button(emoji="⏸", style=disnake.ButtonStyle.blurple, custom_id="pause"),
- disnake.ui.Button(emoji="⏭", style=disnake.ButtonStyle.blurple, custom_id="next")
- ]
- await ctx.channel.send(embed=embed, components=comps)
- return None
- async def add_for_id(self, ctx, i):
- track_object = client.tracks([f"{i}"])[0]
- self.queue.append({
- "title": track_object["title"],
- "requester": ctx.author.id,
- "artist": track_object['artists'][0]['name'],
- "trackid": track_object["id"],
- "albumid": track_object["albums"][0]["id"],
- "image": "https://" + track_object["og_image"].replace("%%", "1000x1000")
- })
- async def skip(self, ctx):
- voice_state = ctx.guild.voice_client
- if voice_state and voice_state.is_playing():
- try:
- await voice_state.stop()
- except:
- pass
- return None
- else:
- return "notState"
- def now_playing(self, guild):
- return queue
- def construct_queue(self, guild):
- texts = []
- for info, pos in zip(self.queue, range(1, len(self.queue))):
- texts.append(f"`{pos}.` [**{info['title']}**](https://music.yandex.ru/album/{info['albumid']}/track/{info['trackid']})")
- if texts == []: return None
- return "\n".join(list(map(str, texts)))
- def parse_queue(self, guild):
- titles = []
- urls = []
- for info, pos in zip(self.queue, range(1, len(self.queue))):
- titles.append(f"{pos}. {info['title']}")
- urls.append(f"https://music.yandex.ru/album/{info['albumid']}/track/{info['trackid']}")
- return {
- "texts": titles,
- "urls": urls
- }
- def pause(self, ctx):
- voice_state = ctx.voice_client
- if voice_state.is_playing():
- voice_state.pause()
- else:
- raise VoiceStateError("don't playing!")
- def resume(self, ctx):
- voice_state = ctx.voice_client
- if not voice_state.is_playing():
- voice_state.resume()
- else:
- raise VoiceStateError("already Playing!")
- def get_rand_track(self, ctx):
- track = random.choice(client.users_likes_tracks()).fetch_track()
- return track
- class Music(commands.Cog):
- def __init__(self, bot):
- self.bot = bot
- self.song = Song()
- @commands.slash_command(name="join",description="Подключает бота к вашему каналу")
- async def _join(self, ctx):
- await ctx.response.defer()
- error = await self.song.join_channel(ctx)
- if error:
- if error == "notChannel": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Для этого необходимо войти в голосовой канал"),color=disnake.Color.red()))
- await ctx.send(embed=disnake.Embed(title=f"<:correctCheckmark:1047244074350018700> {lang(ctx,'Успешно')}",color=0x228b22))
- @commands.slash_command(name="stop",description="Остановить воспроизведение(внимание, очередь будет сохранена.)")
- async def _stop(self, ctx):
- await ctx.response.defer()
- error = await self.song.stop(ctx)
- if error:
- if error == "notState": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Я уже не подключен."),color=disnake.Color.red()))
- await ctx.send(embed=disnake.Embed(title=f"<:correctCheckmark:1047244074350018700> {lang(ctx,'Успешно')}",description=lang(ctx,"Пока-пока!"),color=0x228b22))
- @commands.slash_command(name="play",description="Воспроизвести трек")
- async def _play(self, ctx, название: str = None):
- await ctx.response.defer()
- name = название
- voice_state = ctx.guild.voice_client
- if name:
- try:
- track = self.song.search_tracks(ctx, name)
- except searchError:
- return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Ничего не нашлось :("),color=disnake.Color.red()))
- else:
- text = self.song.construct_queue(ctx.guild)
- if not text:
- return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Очередь на этом сервере пуста, поэтому добавьте что то в неё!"),color=disnake.Color.red()))
- if voice_state:
- if voice_state.is_playing():
- pass
- else:
- error = await self.song.play(ctx)
- else:
- error = await self.song.play(ctx)
- if error:
- if error == "notChannel": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Зайдите в голосовой канал."),color=disnake.Color.red()))
- if name:
- await ctx.send(f"{track['title']} {lang(ctx,'успешно добавлена в очередь')}!")
- else:
- await ctx.send(f"**{self.bot.user.name}** {lang(ctx,'успешно начал воспроизведение очереди')}!")
- @commands.slash_command(name="skip",description="Пропустить трек")
- async def _skip(self, ctx):
- await ctx.response.defer()
- error = await self.song.skip(ctx)
- if error == "notState": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Сейчас ничего не играет!"),color=disnake.Color.red()))
- await ctx.send(embed=disnake.Embed(title=f"<:correctCheckmark:1047244074350018700> {lang(ctx,'Успешно')}",color=0x228b22))
- # @commands.slash_command(name="now-playing",description="Что сейчас играет?")
- # async def np(self, ctx):
- # await ctx.response.defer()
- # voice_state = ctx.guild.voice_client
- # if voice_state:
- # if voice_state.is_playing():
- # infor = Song.now_playing(ctx.guild)
- # embed = disnake.Embed(title=infor['name'],color=0x228b22)
- # embed.add_field(name=lang(ctx,"Главный автор:"),value=infor['artist'])
- # embed.add_field(name=lang(ctx,"Предложил:"),value=f"<@{infor['requester']}>")
- # embed.add_field(name=lang(ctx,"Источник:"),value="<:yandexMusic:1056924402790436934> Yandex Music\n")
- # embed.add_field(name=lang(ctx,"Ссылка:"),value=f"[**{lang(ctx,'Это кликабельная ссылка!')}**]({infor['uri']})")
- # embed.add_field(name=lang(ctx,"Позиция:"),value=infor['pos'])
- # embed.set_thumbnail(url=infor['image'])
- # await ctx.send(embed=embed)
- # else:
- # await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка!')}",description=lang(ctx,'Сейчас ничего не играет'),color=disnake.Color.red()))
- # else:
- # await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка!')}",description=lang(ctx,"Сейчас ничего не играет"),color=disnake.Color.red()))
- @commands.slash_command(name="queue", description="Посмотреть очередь")
- async def queu(self, ctx):
- await ctx.response.defer()
- text = self.song.construct_queue(ctx.guild)
- if not text:
- return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Очередь на этом сервере пуста!"),color=disnake.Color.red()))
- await ctx.send(embed=disnake.Embed(title=f"{lang(ctx,'Очередь сервера')} {ctx.guild.name}!",description=text, color=0x228b22))
- @commands.slash_command(name="pause",description="Поставить воспроизведение на паузу")
- async def paus_(self, ctx):
- await ctx.response.defer()
- try:
- self.song.pause(ctx.guild)
- except VoiceStateError:
- await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Сейчас ничего не играет"),color=disnake.Color.red()))
- else:
- await ctx.send(f"<:correctCheckmark:1047244074350018700>, {lang(ctx,'чтобы продолжить воспроизведение введите /resume')}")
- @commands.slash_command(name="resume",description="Продолжить воспроизведение")
- async def resu_(self, ctx):
- await ctx.response.defer()
- try:
- self.song.resume(ctx.guild)
- except VoiceStateError:
- await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Трек уже играет"),color=disnake.Color.red()))
- else:
- await ctx.send(f"<:correctCheckmark:1047244074350018700>")
- @commands.slash_command(name="replay",description="Воспроизвести трек с начала")
- async def re_(self, ctx):
- await ctx.response.defer()
- voice_state = ctx.guild.voice_client
- if voice_state and voice_state.is_playing():
- self.song.pause(ctx.guild)
- await self.song.play(ctx)
- await ctx.send("<:correctCheckmark:1047244074350018700>")
- else:
- await ctx.send("<:wrongCheckmark:1047244133078675607>" + lang(ctx, "Сейчас ничего не играет чтобы начать воспроизведение заново!"))
- @commands.slash_command(name="random-track",description="Воспроизвести рандомный трек")
- async def ra_tra(self, ctx):
- await ctx.response.defer()
- voice_state = ctx.guild.voice_client
- track = self.song.get_rand_track(ctx)
- await self.song.add_for_id(ctx, track["id"])
- if voice_state:
- if voice_state.is_playing():
- #await Song.leave_channel(ctx)
- pass
- else:
- error = await self.song.play(ctx)
- if error == "notChannel": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Зайдите в голосовой канал."),color=disnake.Color.red()))
- else:
- error = await self.song.play(ctx)
- if error == "notChannel": return await ctx.send(embed=disnake.Embed(title=f"<:wrongCheckmark:1047244133078675607>{lang(ctx,'Ошибка')}",description=lang(ctx,"Зайдите в голосовой канал."),color=disnake.Color.red()))
- await ctx.send(f"Рандомный трек успешно добавлен в очередь!")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement