|
import discord # type: ignore |
|
|
|
import slash_util |
|
from player import PlayerMixin, Player, Song |
|
|
|
from io import BytesIO |
|
import re |
|
import asyncio |
|
|
|
GUILD_ID = 123 |
|
TOKEN = "your.bot.token" |
|
|
|
class _Context(slash_util.Context): |
|
@property |
|
def player(self): |
|
return self.bot.get_player(self.interaction.guild) |
|
|
|
setattr(slash_util, "Context", _Context) |
|
|
|
youtube_regex = re.compile( |
|
r"https:\/\/(?:www\.)?youtu(?:be\.com\/watch\?v=.+$|\.be\/.+$)" |
|
) |
|
|
|
def progress_bar(percent: float) -> str: |
|
percent = int(percent - (percent % 5)) // 5 |
|
if percent == 0: |
|
percent = 1 |
|
bar = ("―" * (percent - 1)) + "🔘" + ("―" * (20 - percent)) |
|
return bar |
|
|
|
|
|
def to_duration(duration: int) -> str: |
|
hours, minutes = divmod(duration, 3600) |
|
minutes, seconds = divmod(minutes, 60) |
|
|
|
if seconds < 10: |
|
seconds = f'0{seconds}' |
|
|
|
def maybe(n: int) -> str: |
|
if n == 0: |
|
return "" |
|
else: |
|
return f"{n}:" |
|
|
|
return f"{maybe(hours)}{maybe(minutes)}{seconds}" |
|
|
|
|
|
class MusicCog(slash_util.ApplicationCog): |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="play", description="Plays a song") |
|
@slash_util.describe(url="The YouTube url", volume="The volume that the song should be (%)") |
|
async def _play(self, ctx, url: str, volume: int = 100): |
|
interaction = ctx.interaction |
|
await interaction.response.defer(ephemeral=True) |
|
message = await interaction.original_message() |
|
|
|
if not re.match(youtube_regex, url): |
|
await message.edit(content="That is not a valid YouTube URL.") |
|
return |
|
|
|
if ctx.player is None: |
|
player = await Player.create(ctx) |
|
else: |
|
if ctx.player.voice_client.channel.id != interaction.user.voice.channel.id: # type: ignore |
|
ctx.player.voice_client.stop() |
|
await ctx.player.voice_client.move_to(interaction.user.voice.channel) # type: ignore |
|
player = ctx.player |
|
|
|
if player is None: |
|
return |
|
|
|
player.update_interaction(ctx) |
|
song = await Song.from_url(player, url, volume/100) # type: ignore |
|
try: |
|
player.queue.put_nowait(song) |
|
await message.edit(content=f"🎵 Added `{song.title}` to the queue.") |
|
except asyncio.QueueFull: |
|
await message.edit(content="The queue is full!") |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="current", description="See what song is currently playing") |
|
async def _current(self, ctx): |
|
if ctx.player is None or ctx.player.current is None: |
|
content = "I am not playing anything!" |
|
else: |
|
progress = progress_bar( |
|
(ctx.player.current.seconds_played / ctx.player.current.duration) * 100 |
|
) |
|
content = f"🎵 Currently playing `{ctx.player.current.title}`.\n`{to_duration(ctx.player.current.seconds_played)}` {progress} `{to_duration(ctx.player.current.duration)}`" |
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="volume", description="View or change the volume for the current song") |
|
@slash_util.describe(volume="The volume you want to set it to (%)") |
|
async def _volume(self, ctx, volume: int = None): |
|
if ctx.player is None or ctx.player.current is None: |
|
content = "I am not playing anything!" |
|
|
|
elif volume is None: |
|
content = f"🔊 Currently the volume is `{int(ctx.player.current.source.volume*100)}%`" |
|
else: |
|
ctx.player.set_volume(volume/100) # type: ignore |
|
content = f"🔊 I have set the volume to `{int(ctx.player.current.source.volume*100)}%`" |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="pause", description="Pauses the current song.") |
|
async def _pause(self, ctx): |
|
if ctx.player is None or ctx.player.current is None: |
|
content = "I am not playing anything!" |
|
elif ctx.player.voice_client.is_paused(): |
|
content = "The current song is already paused!" |
|
else: |
|
ctx.player.voice_client.pause() |
|
content = f"⏸️ Paused `{ctx.player.current.title}`" |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="resume", description="Resumes the current song.") |
|
async def _resume(self, ctx): |
|
if ctx.player is None or ctx.player.current is None: |
|
content = "I am not playing anything!" |
|
elif not ctx.player.voice_client.is_paused(): |
|
content = "The current song is not paused!" |
|
else: |
|
ctx.player.voice_client.resume() |
|
content = f"⏸️ resumed `{ctx.player.current.title}`" |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="skip", description="Skips the current song.") |
|
async def _skip(self, ctx): |
|
if ctx.player is None or ctx.player.current is None: |
|
content = "I am not playing anything!" |
|
else: |
|
content = f"⏭️ Skipped `{ctx.player.current.title}`." |
|
ctx.player.voice_client.stop() |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="queue", description="Look at your queue") |
|
async def _queue(self, ctx): |
|
if ctx.player is None: |
|
content = "I am not connected to a voice channel." |
|
|
|
elif ctx.player.queue.qsize() == 0: |
|
content = "The queue is empty!" |
|
else: |
|
content = "📋 Here is your queue:\n" + "\n".join( |
|
f"{n}. `{song.title}`" for n, song in enumerate(ctx.player.get_queue(), start=1) |
|
) |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="remove", description="Removes an item from the queue by its index") |
|
@slash_util.describe(index="The index of the song you want to remove.") |
|
async def _remove(self, ctx, index: int): |
|
if ctx.player is None: |
|
content = "I am not connected to a voice channel." |
|
|
|
elif ctx.player.queue.qsize() == 0: |
|
content = "The queue is empty!" |
|
else: |
|
try: |
|
item = ctx.player.remove_item(index) |
|
except IndexError: |
|
content = "That index does not exist." |
|
else: |
|
content = f"❌ Removed `{item.title}`." |
|
|
|
await ctx.interaction.response.send_message(content=content, ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
@slash_util.slash_command(guild_id=GUILD_ID, name="stop", description="Stops the player.") |
|
async def _stop(self, ctx): |
|
if ctx.player is None or ctx.player.current is None: |
|
await ctx.interaction.response.send_message(content="I am not playing anything!", ephemeral=True) |
|
return |
|
|
|
await ctx.interaction.response.send_message(content="Stopping the player", ephemeral=True) |
|
ctx.player.update_interaction(ctx) |
|
|
|
await ctx.player.destroy("invokation of `stop`.") |
|
|
|
|
|
class Bot(slash_util.Bot, PlayerMixin): |
|
def __init__(self, **kwargs) -> None: |
|
super().__init__(**kwargs) |
|
PlayerMixin.__init__(self) |
|
|
|
self.add_cog(MusicCog(self)) |
|
|
|
async def start(self, token: str, *, reconnect: bool = True) -> None: |
|
await self.login(token) |
|
|
|
app_info = await self.application_info() |
|
self._connection.application_id = app_info.id |
|
|
|
await self.delete_all_commands(guild_id=GUILD_ID) |
|
|
|
await self.sync_commands() |
|
await self.connect(reconnect=reconnect) |
|
|
|
async def on_ready(self): |
|
print(f"logged in as {self.user!s}") |
|
|
|
async def close(self): |
|
await PlayerMixin.close(self) |
|
return await super().close() |
|
|
|
bot = Bot(command_prefix="!") |
|
|
|
bot.run(TOKEN) |