-
-
Save jheld/8736f5fa813ce5f335cbbea2c0679d0c to your computer and use it in GitHub Desktop.
from celery.worker.control import control_command | |
from celery.worker import state as worker_state | |
@control_command( | |
args=[('n', float)], | |
signature='[N=0]', # <- used for help on the command-line. | |
) | |
def purge_revoked(state, n=0): | |
if n: | |
worker_state.revoked.purge(n) | |
else: | |
worker_state.revoked.clear() | |
return {'ok': 'purged all revoked task info.'} | |
@control_command( | |
args=[('n', int)], | |
signature='[N={}]'.format(worker_state.REVOKES_MAX), # <- used for help on the command-line. | |
) | |
def revokes_max(state, n=worker_state.REVOKES_MAX): | |
worker_state.revoked.maxlen = n | |
worker_state.revoked.purge() | |
return {'ok': 'updated revoked task max length.'} | |
@control_command( | |
args=[('n', int)], | |
signature='[N={}]'.format(worker_state.REVOKE_EXPIRES), # <- used for help on the command-line. | |
) | |
def revokes_expires(state, n=worker_state.REVOKE_EXPIRES): | |
worker_state.revoked.expires = n | |
worker_state.revoked.purge() | |
return {'ok': 'updated revoked task expiration.'} |
Sure.
These functions are all managing in various ways the given worker's revoked task collection/queue/container data structure. Revoked tasks are the same as those described anywhere in the standard celery (and flower) documentation. That is, if a task was revoked and you want to ensure high confidence that your workers would not re-execute said task (e.g. that same task with the same UUID) should a worker come up and see, for instance, a task retry entry in the broker and pull it down to execute it. The topic of revoked tasks and the associated lifecycle of it is a bit beyond the scope of this script, but I wanted to provide some background there.
The workers by default will hold in memory the record of all revoked task IDs (technically since boot). If all workers are shutdown simultaneously, and then workers come up again, there will be no revoked tasks in their memory. Why do I say this? Because the revoked task IDs are propagated from each worker to all other workers, to instruct them on the updated set of revoked task IDs. So in the case of a rolling deployment of workers because there is a code update, unless the policy is to wait for all workers to terminate completely, you would have some new workers coming up while some of the "old" ones are about to go down, and thus the old ones will share their "wisdom" of the revoked "no-no please do not run" tasks, with the "new comers". But if they all went down first, then no one would get this historical perspective because there would be no one around that remembered it.
So, purge_revoked
instructs the worker to purge/pop/remove x
(well n
) revoked tasks from its revoked tasks memory. If a falsey n
is provided (such as 0
, the default), then the revoked data structure is cleared out to empty. The algorithm behind which to purge is defined by the implementation of the revoked tasks data structure and is thus outside the scope of this gist (I do know how it works, but again, it's not super relevant here).
revokes_max
instructs the worker to adjust the size of its revoked data structure (say the max was 50000 task IDs to remember, you could change it to 1!). It also calls purge because should you reduce the maxlen, you want to kill things off, but even if you increase it, you still wish to allow the possibility that the data structure/logic manager could have a time-based flush (not just a max limit, but a oldest after a certain time threshold), and thus you do no harm -- just the standard operations here.
revokes_expires
is what I was referring to above about a time threshold. And that is also why it calls purge
.
I'm happy to add comments throughout the gist as well!
@jheld Hi, sorry to revive this old thread. May I just ask, how did you call these remote control commands? Was it periodically from an external scheduled process?
I think primarily it was part of a script within continuous deployment. So we'd run it I think before deploying new code. We also tended to opt for scaling the workers to 0 before starting a roll-out as well. A large part of the issue being addressed with these commands is that new workers coming up would simply add to the memory issue, so as we deployed new code, if we had scaled all workers entirely down before adding any new, then I think the general issue goes away. But yes, you could still add this at any cadence of your comfort.
Hi -- I have seen celery/kombu#294 (comment) and I arrived here but I am really unable to understand what the code is actually doing. Could you please provide me some explanations on what this code does and how?