Skip to content

Instantly share code, notes, and snippets.

@jheld
Last active January 29, 2025 19:41
Show Gist options
  • Save jheld/8736f5fa813ce5f335cbbea2c0679d0c to your computer and use it in GitHub Desktop.
Save jheld/8736f5fa813ce5f335cbbea2c0679d0c to your computer and use it in GitHub Desktop.
celery purge revocation control command
from celery.worker.control import control_command
from celery.worker import state as worker_state
@control_command(
args=[('n', float)],
signature='[N=0]', # <- used for help on the command-line.
)
def purge_revoked(state, n=0):
if n:
worker_state.revoked.purge(n)
else:
worker_state.revoked.clear()
return {'ok': 'purged all revoked task info.'}
@control_command(
args=[('n', int)],
signature='[N={}]'.format(worker_state.REVOKES_MAX), # <- used for help on the command-line.
)
def revokes_max(state, n=worker_state.REVOKES_MAX):
worker_state.revoked.maxlen = n
worker_state.revoked.purge()
return {'ok': 'updated revoked task max length.'}
@control_command(
args=[('n', int)],
signature='[N={}]'.format(worker_state.REVOKE_EXPIRES), # <- used for help on the command-line.
)
def revokes_expires(state, n=worker_state.REVOKE_EXPIRES):
worker_state.revoked.expires = n
worker_state.revoked.purge()
return {'ok': 'updated revoked task expiration.'}
@frnidito
Copy link

Hi -- I have seen celery/kombu#294 (comment) and I arrived here but I am really unable to understand what the code is actually doing. Could you please provide me some explanations on what this code does and how?

@jheld
Copy link
Author

jheld commented Aug 6, 2022

Sure.

These functions are all managing in various ways the given worker's revoked task collection/queue/container data structure. Revoked tasks are the same as those described anywhere in the standard celery (and flower) documentation. That is, if a task was revoked and you want to ensure high confidence that your workers would not re-execute said task (e.g. that same task with the same UUID) should a worker come up and see, for instance, a task retry entry in the broker and pull it down to execute it. The topic of revoked tasks and the associated lifecycle of it is a bit beyond the scope of this script, but I wanted to provide some background there.

The workers by default will hold in memory the record of all revoked task IDs (technically since boot). If all workers are shutdown simultaneously, and then workers come up again, there will be no revoked tasks in their memory. Why do I say this? Because the revoked task IDs are propagated from each worker to all other workers, to instruct them on the updated set of revoked task IDs. So in the case of a rolling deployment of workers because there is a code update, unless the policy is to wait for all workers to terminate completely, you would have some new workers coming up while some of the "old" ones are about to go down, and thus the old ones will share their "wisdom" of the revoked "no-no please do not run" tasks, with the "new comers". But if they all went down first, then no one would get this historical perspective because there would be no one around that remembered it.

So, purge_revoked instructs the worker to purge/pop/remove x (well n) revoked tasks from its revoked tasks memory. If a falsey n is provided (such as 0, the default), then the revoked data structure is cleared out to empty. The algorithm behind which to purge is defined by the implementation of the revoked tasks data structure and is thus outside the scope of this gist (I do know how it works, but again, it's not super relevant here).

revokes_max instructs the worker to adjust the size of its revoked data structure (say the max was 50000 task IDs to remember, you could change it to 1!). It also calls purge because should you reduce the maxlen, you want to kill things off, but even if you increase it, you still wish to allow the possibility that the data structure/logic manager could have a time-based flush (not just a max limit, but a oldest after a certain time threshold), and thus you do no harm -- just the standard operations here.

revokes_expires is what I was referring to above about a time threshold. And that is also why it calls purge.

I'm happy to add comments throughout the gist as well!

@ulysses-bmll
Copy link

@jheld Hi, sorry to revive this old thread. May I just ask, how did you call these remote control commands? Was it periodically from an external scheduled process?

@jheld
Copy link
Author

jheld commented Jan 29, 2025

I think primarily it was part of a script within continuous deployment. So we'd run it I think before deploying new code. We also tended to opt for scaling the workers to 0 before starting a roll-out as well. A large part of the issue being addressed with these commands is that new workers coming up would simply add to the memory issue, so as we deployed new code, if we had scaled all workers entirely down before adding any new, then I think the general issue goes away. But yes, you could still add this at any cadence of your comfort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment