-
-
Save anudeepsamaiya/eee5534b73ebca06b34bdf01a4aba7d9 to your computer and use it in GitHub Desktop.
Celery worker blocks on rate limited task
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Celery worker blocks on rate limited task | |
========================================= | |
by github.com/miraculixx | |
Problem: | |
If a worker has a rate_limit active on some task, and that task | |
arrives (is received) more often than the rate limit interval, all | |
worker processes will block on these task instances and stop | |
consuming other tasks as soon as the prefetch count has maxed out | |
(max_concurrency * prefetch multiplier) | |
Expected: | |
According to the documentation and least-surprise interpretation [1][2] | |
setting a rate_limit should not affect any other tasks. At least the | |
behavior as observed should be stated as a warning in the documentation. | |
Workarounds: | |
* in general set max_concurrency * worker_prefetch_multiplier to a number | |
larger than the expected rate of task arrival for the rate limited tasks | |
* use autoscale to dynamically add workers if you have bursts of tasks | |
this will dynamically increase and decrease the active worker processes | |
which can reduce the problem | |
* separate workers and queues for rate limited tasks to ensure there is | |
never any worker saturation for all other tasks | |
Environment: | |
$ celery -A tasks report | |
software -> celery:4.4.0 (cliffs) kombu:4.6.7 py:3.7.6 | |
billiard:3.6.2.0 redis:3.4.1 | |
platform -> system:Linux arch:64bit | |
kernel version:4.15.0-76-generic imp:CPython | |
loader -> celery.loaders.app.AppLoader | |
settings -> transport:redis results:redis://localhost/ | |
broker_url: 'redis://localhost:6379//' | |
result_backend: 'redis://localhost/' | |
How to reproduce | |
---------------- | |
1. Create a sample celery app with two tasks A and B (see tasks.py) | |
2. Run with one worker, max concurrency set to 4 | |
$ celery -A tasks worker --loglevel=DEBUG -c 4 | |
3. Use the command line to rate_limit task A to 1/h | |
$ celery -A tasks control rate_limit tasks.A 1/h | |
-> celery@arwen: OK | |
new rate limit set successfully | |
4. Submit task A 5 times | |
# do this 5 times | |
$ celery -A tasks call tasks.A 2 4 | |
Result: The worker log will show 5 Received task messages and execute | |
the first one, no more (that's the rate limit in effect, as expected). | |
[2020-02-22 14:15:26,771: INFO/ForkPoolWorker-1] Task tasks.A[ea7dcee8-ac98-4e6c-88e8-51dfd0e9cdc6] succeeded in 0.002578758983872831s: 6 | |
[2020-02-22 14:15:27,534: INFO/MainProcess] Received task: tasks.A[9af3010e-1f79-462f-b77d-2c56ccbb1d20] | |
[2020-02-22 14:15:28,069: INFO/MainProcess] Received task: tasks.A[5f7315d5-7f53-443a-8f48-1e9365cf7c52] | |
[2020-02-22 14:15:28,595: INFO/MainProcess] Received task: tasks.A[51b04b84-831e-45cb-9b73-7481f7707589] | |
[2020-02-22 14:15:29,225: INFO/MainProcess] Received task: tasks.A[474f7ad4-48b6-4c26-9066-4324784e12ee] | |
5. Call task B. | |
$ celery -A tasks call tasks.B --args '[2, 4]' | |
55deebde-edf5-41ae-adc1-a78b323c4b39 | |
Result: Task B will never be received nor executed until the next time a task A | |
has been run (at the next rate limit interval). The 4 rate limited tasks | |
submitted in step 4 are essentially blocking all the worker processes | |
from consuming any more tasks | |
6. Check worker status | |
# looking good | |
$ celery -A tasks inspect registered | |
-> celery@arwen: OK | |
* tasks.A [rate_limit=1/h] | |
* tasks.B | |
$ celery -A tasks inspect active | |
-> celery@arwen: OK | |
- empty - | |
$ celery -A tasks inspect scheduled | |
-> celery@arwen: OK | |
- empty - | |
$ celery -A tasks inspect reserved | |
-> celery@arwen: OK | |
- empty - | |
7. Growing the process pool will execute task B | |
$ celery -A tasks control pool_grow 1 | |
-> celery@arwen: OK | |
pool will grow | |
Result: Task B is received and run by the new worker process | |
[2020-02-22 14:18:19,677: INFO/MainProcess] Received task: tasks.B[55deebde-edf5-41ae-adc1-a78b323c4b39] | |
(...) | |
[2020-02-22 14:18:19,678: INFO/ForkPoolWorker-1] Task tasks.B[55deebde-edf5-41ae-adc1-a78b323c4b39] succeeded in 0.0005468999734148383s: -2 | |
8. Submitting another task A will block the new worker too | |
$ celery -A tasks call tasks.A 2 4 | |
$ celery -A tasks call tasks.B 2 4 | |
Result: Task B will never not be received nor executed | |
References | |
---------- | |
[1] https://groups.google.com/forum/#!searchin/celery-users/rate$20limit$20stops$20worker%7Csort:date/celery-users/NBw6EqXwVuA/7SK0fNCnAQAJ | |
[2] https://freenode.logbot.info/celery/20200221#c3282156 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '2' | |
services: | |
redis: | |
image: redis | |
ports: | |
- "6379:6379" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
celery==4.4.0 | |
honcho==1.0.1 | |
redis==3.4.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') | |
# for demo purpose restrict the number of prefetched tasks to 1 | |
# this effectively limits the number of rate limited tasks submitted to 4 | |
# before the worker gets blocked. Leaving it at the default value (4) means | |
# you need 4 * 4 = 16 tasks before the worker stops receiving new tasks | |
# That is the number of rate limited tasks required to block the worker | |
# increases however it does not solve the problem (though setting the prefetch | |
# multiplier to some very large number e.g. sys.maxint may be a work around | |
# in some scenarios) | |
# https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-prefetch-multiplier | |
app.conf.worker_prefetch_multiplier = 1 | |
@app.task | |
def A(x, y): | |
return x + y | |
@app.task | |
def B(x, y): | |
return x - y | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment