Skip to content

Instantly share code, notes, and snippets.

@peterdemin
Last active August 29, 2015 14:10
Show Gist options
  • Save peterdemin/abebf85892e248ca88a6 to your computer and use it in GitHub Desktop.
Save peterdemin/abebf85892e248ca88a6 to your computer and use it in GitHub Desktop.
celery retry task in chain of groups
Steps to reproduce:
1. apply async chain of groups of tasks;
2. raise retry inside task(s) of first group;
3. watch, how tasks of second group are finished before those in first group.
Particularly in attached logs all 'add' tasks should finish before starting of 'mul' tasks.
But as retry occurs, order does not hold anymore.
The actual order was:
Received task: add[0af...]
Received task: add[70b...]
Received task: add[635...]
Task add[0af...] succeeded in 0.15: 1
Task add[70b...] retry: Retry in 1s
Task add[635...] retry: Retry in 1s
Received task: mul[3c9...]
Received task: mul[894...]
Received task: add[635...] eta:[2014-11-28 18:44:33.514573+00:00]
Task mul[894...] retry: Retry in 1s
Received task: add[70b...] eta:[2014-11-28 18:44:33.506460+00:00]
Received task: mul[894...] eta:[2014-11-28 18:44:34.552600+00:00]
Task mul[3c9...] retry: Retry in 1s
Task add[70b...] succeeded in 0.00: 3
Task add[635...] retry: Retry in 1s
Task mul[894...] succeeded in 0.00: 56
Received task: mul[3c9...] eta:[2014-11-28 18:44:34.554050+00:00]
Received task: add[635...] eta:[2014-11-28 18:44:35.454665+00:00]
Task mul[3c9...] succeeded in 0.00: 30
Task add[635...] succeeded in 0.00: 7
import random
from celery import Celery
app = Celery(
'retry_task_in_group_in_chain',
broker='mongodb+mongodb://localhost:27017/test-temp'
)
@app.task
def add(x, y):
if random.randint(0, 1) == 0:
return x + y
else:
raise add.retry(countdown=1)
@app.task
def mul(x, y):
if random.randint(0, 1) == 0:
return x * y
else:
raise mul.retry(countdown=1)
def chain_group_of_tasks():
g1 = app.canvas.group([add.s(0, 1), add.s(1, 2), add.s(3, 4)])
g2 = app.canvas.group([mul.s(5, 6), mul.s(7, 8)])
c = app.canvas.chain(g1, g2)
c.apply_async()
if __name__ == '__main__':
chain_group_of_tasks()
$ python retry_task_in_group_in_chain.py &
[1] 21457
$ celery -A retry_task_in_group_in_chain worker --loglevel=INFO
[2014-11-28 21:44:28,958: WARNING/MainProcess] .../site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@ya v3.1.16 (Cipater)
---- **** -----
--- * *** * -- Linux-3.8.0-44-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: retry_task_in_group_in_chain:0x1c13110
- ** ---------- .> transport: mongodb+mongodb://localhost:27017/test-temp
- ** ---------- .> results: disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. retry_task_in_group_in_chain.add
. retry_task_in_group_in_chain.mul
[2014-11-28 21:44:29,060: INFO/MainProcess] Connected to mongodb+mongodb://localhost:27017/test-temp
[2014-11-28 21:44:29,414: WARNING/MainProcess] celery@ya ready.
[2014-11-28 21:44:32,483: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[0af893fe-76ac-451a-94f8-25aea59b3895]
[2014-11-28 21:44:32,494: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[70b6f3e4-88fa-4008-a9f7-bf4d2b8af7c8]
[2014-11-28 21:44:32,505: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd]
[2014-11-28 21:44:32,639: INFO/MainProcess] Task retry_task_in_group_in_chain.add[0af893fe-76ac-451a-94f8-25aea59b3895] succeeded in 0.154771071997s: 1
[2014-11-28 21:44:32,642: INFO/MainProcess] Task retry_task_in_group_in_chain.add[70b6f3e4-88fa-4008-a9f7-bf4d2b8af7c8] retry: Retry in 1s
[2014-11-28 21:44:32,651: INFO/MainProcess] Task retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd] retry: Retry in 1s
[2014-11-28 21:44:33,533: INFO/MainProcess] Received task: retry_task_in_group_in_chain.mul[3c9ca792-8d14-4cc1-9d74-e0875e4a4450]
[2014-11-28 21:44:33,549: INFO/MainProcess] Received task: retry_task_in_group_in_chain.mul[8941a665-ae6a-4e2d-993b-61ba06d068e6]
[2014-11-28 21:44:33,563: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd] eta:[2014-11-28 18:44:33.514573+00:00]
[2014-11-28 21:44:33,576: INFO/MainProcess] Task retry_task_in_group_in_chain.mul[8941a665-ae6a-4e2d-993b-61ba06d068e6] retry: Retry in 1s
[2014-11-28 21:44:33,577: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[70b6f3e4-88fa-4008-a9f7-bf4d2b8af7c8] eta:[2014-11-28 18:44:33.506460+00:00]
[2014-11-28 21:44:33,590: INFO/MainProcess] Received task: retry_task_in_group_in_chain.mul[8941a665-ae6a-4e2d-993b-61ba06d068e6] eta:[2014-11-28 18:44:34.552600+00:00]
[2014-11-28 21:44:33,685: INFO/MainProcess] Task retry_task_in_group_in_chain.mul[3c9ca792-8d14-4cc1-9d74-e0875e4a4450] retry: Retry in 1s
[2014-11-28 21:44:34,454: INFO/MainProcess] Task retry_task_in_group_in_chain.add[70b6f3e4-88fa-4008-a9f7-bf4d2b8af7c8] succeeded in 0.000688284999342s: 3
[2014-11-28 21:44:34,478: INFO/MainProcess] Task retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd] retry: Retry in 1s
[2014-11-28 21:44:34,554: INFO/MainProcess] Task retry_task_in_group_in_chain.mul[8941a665-ae6a-4e2d-993b-61ba06d068e6] succeeded in 0.000532722995558s: 56
[2014-11-28 21:44:34,615: INFO/MainProcess] Received task: retry_task_in_group_in_chain.mul[3c9ca792-8d14-4cc1-9d74-e0875e4a4450] eta:[2014-11-28 18:44:34.554050+00:00]
[2014-11-28 21:44:34,628: INFO/MainProcess] Received task: retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd] eta:[2014-11-28 18:44:35.454665+00:00]
[2014-11-28 21:44:35,455: INFO/MainProcess] Task retry_task_in_group_in_chain.mul[3c9ca792-8d14-4cc1-9d74-e0875e4a4450] succeeded in 0.00167835899629s: 30
[2014-11-28 21:44:35,464: INFO/MainProcess] Task retry_task_in_group_in_chain.add[635c1464-532a-4908-b831-1a3c34ec51bd] succeeded in 0.000486356999318s: 7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment