Here is an example on how to use commmands.MaxConcurrency for multiple bucket types.
If you want to add more concurrencies, you will need to add more try/except blocks
and in each one's except
statement you need to release all the previous concurrencies
that you have acquired.
Created
February 17, 2023 21:57
-
-
Save LeoCx1000/1cc9c73fd955455362fef3eef79e1260 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import Any | |
from discord.ext import commands | |
class Test(commands.Cog): | |
'''test cog''' | |
def __init__(self): | |
# We need to store the concurrencies. I am storing it in the cog scope attached to self, for easy access. | |
self.user_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.user, wait=False) | |
self.channel_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.channel, wait=False) | |
@commands.command() | |
async def test(self, ctx: commands.Context[commands.Bot]): | |
"""Test command.""" | |
m = await ctx.send('sleeping ten seconds') | |
await asyncio.sleep(10) | |
await m.edit(content='slept') | |
@test.before_invoke | |
async def test_before(self, ctx: commands.Context[commands.Bot]): | |
"""This triggers before the command runs.""" | |
await self.user_concurrency.acquire(ctx.message) | |
# First, we acquire the first concurrency. | |
# which, if its already acquired it will | |
# raise an error because wait is set to true. | |
try: | |
await self.channel_concurrency.acquire(ctx.message) | |
# Then, we acquire our second concurrency. | |
except commands.MaxConcurrencyReached as error: | |
# If this second concurrency fails, we need to release the | |
# first one we just acquired since it will not go through, and | |
# if we don't do this, the first one will be locked forever | |
# because the after_invoke hook will not trigger. | |
await self.user_concurrency.release(ctx.message) | |
# Then, we raise the error so it propagates to the error handlers | |
# and it doesn't just get eaten. If we don't raise it will instead | |
# work without raising an error. | |
raise error | |
@test.after_invoke | |
async def test_after(self, ctx: commands.Context[commands.Bot]): | |
"""This triggers after the command ran.""" | |
# Here in the after, we just release both concurrencies so | |
# the commands can be ran again by other people. | |
await self.user_concurrency.release(ctx.message) | |
await self.channel_concurrency.release(ctx.message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It won't be needed by most, but if one has enough concurrencies that managing the try-excepts becomes a pain, this can be refactored to work on a sequence of
MaxConcurrency
instances. Same principles apply, but I figured it was worth mentioning.