Created
June 23, 2019 14:05
-
-
Save huogerac/982d98135aa56bbd60b2de59bf2bdf88 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Celery Tasks | |
============ | |
This file defines the expected API. | |
""" | |
import jsonschema | |
from celery import Celery, subtask, group, chord | |
from . import settings | |
# Celery App | |
app = Celery(broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND_URL) | |
@app.task(bind=True, name='hello.double_value') | |
def double_value(self, number: int): | |
if not isinstance(number, int): | |
raise RuntimeError("Number is not int!") | |
return number * 2 | |
@app.task(bind=True, name='hello.double_callback') | |
def double_callback(self, number: int): | |
universe_life_and_everything = 42 | |
if number != universe_life_and_everything: | |
raise RuntimeError("The number is not 42!") | |
return number | |
def send_notification(): | |
pass | |
@app.task(bind=True, name='hello.handle_errors') | |
def handle_errors(self, value): | |
send_notification(value) | |
return True | |
@app.task(bind=True, name='hello.add_value') | |
def add_value(self, number: int, original_value): | |
return original_value + number | |
@app.task(bind=True, name='hello.sum_values') | |
def sum_values(self, values): | |
return sum(values) | |
@app.task(bind=True, name='hello.is_even') | |
def is_even(self, number): | |
return number % 2 == 0 | |
@app.task(bind=True, name='hello.map_extract_text') | |
def get_items_and_double_them(self, seed, total): | |
""" | |
FUNCIONA | |
Contra: | |
""" | |
items = get_list_of_items(seed, total) | |
signature = double_value.map(items) | |
return signature | |
@app.task | |
def dmap(it, callback, final=None): | |
""" creates a new task for each item from it """ | |
callback = subtask(callback) | |
run_in_parallel = group(callback.clone([arg, ]) for arg in it) | |
if len(run_in_parallel.tasks) == 0: | |
return [] | |
if final: | |
return chord(run_in_parallel)(final) | |
return run_in_parallel.delay() | |
@app.task | |
def get_list_of_items(seed=42, total=10): | |
import random | |
random.seed(seed) | |
numbers = random.sample(range(1,100), total) | |
return numbers | |
@app.task | |
def get_summary(items): | |
return { | |
"sum": sum(items), | |
"min": min(items), | |
"max": max(items), | |
"mean": sum(items)/len(items), | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
import mock | |
from hello_celery import tasks | |
def test_should_create_task_dynamically_1(celery_app): | |
""" | |
Problema: | |
Dado uma lista de resultados da Task1, | |
Quero Executar a Task2 para cada item da Task1 (usando fila) | |
Solucao 1: | |
Criar uma Task3 que executa a Task1 e faz um map do resultado | |
com as Task2 | |
""" | |
signature = tasks.get_items_and_double_them( | |
seed=42, total=8 | |
).delay() | |
result = signature.get() | |
assert result == [164, 30, 8, 190, 72, 64, 58, 36] | |
def test_should_create_task_dynamically_2(celery_app): | |
""" | |
Problema: | |
Dado uma lista de resultados da Task1, | |
Quero Executar a Task2 para cada item da Task1 (usando fila) | |
Solução: | |
Cria uma Task3 (dynamic map) que recebe a Task2 via parametro | |
""" | |
signature = ( | |
tasks.get_list_of_items.s(seed=42, total=8) | | |
tasks.dmap.s( | |
callback=tasks.double_value.s(), | |
) | |
).delay() | |
# Nao sei pq tem que fazer 2 gets? | |
result = signature.get().get() | |
assert result == [164, 30, 8, 190, 72, 64, 58, 36] | |
def test_should_create_task_dynamically_with_chord(celery_app): | |
""" | |
Problema: | |
Dado uma lista de resultados da Task1, | |
Quero Executar a Task2 para cada item da Task1 (usando fila) | |
""" | |
dag = ( | |
tasks.get_list_of_items.s(42, 8) | | |
tasks.dmap.s( | |
callback=tasks.double_value.s(), | |
final=tasks.get_summary.s(), | |
) | |
).delay() | |
result = dag.get().get() | |
assert result == { | |
'sum': 622, | |
'min': 8, | |
'max': 190, | |
'mean': 77.75 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
import mock | |
from hello_celery import tasks | |
def test_should_double_number_simple_task(celery_app): | |
signature = tasks.double_value.s(21).delay() | |
result = signature.get() | |
assert result == 42 | |
def test_should_double_number_simple_task_using_queue(celery_app): | |
signature = tasks.double_value.s(21).apply_async(queue='q1') | |
result = signature.get() | |
assert result == 42 | |
def test_should_double_number_another_way(celery_app): | |
signature = celery_app.signature( | |
'hello.double_value', | |
args=(21,), | |
kwargs={}, | |
queue='q1' | |
) | |
assert signature.delay().get() == 42 | |
def test_should_execute_callback(celery_app): | |
signature = tasks.double_value.s(22) | |
signature.link(tasks.double_callback.s()) | |
with pytest.raises(Exception) as e: | |
assert signature.delay().get() == 44 | |
assert str(e.value) == "The number is not 42!" | |
def test_should_execute_raise_exception(celery_app): | |
signature = tasks.double_value.s("@") | |
with pytest.raises(Exception) as e: | |
assert str(e.value) == "Number is not int!" | |
def test_should_ignore_exception(celery_app): | |
with mock.patch('hello_celery.tasks.send_notification') as send_notification_mock: | |
signature = tasks.double_value.s("@") | |
signature.link_error(tasks.handle_errors.s()) | |
with pytest.raises(Exception) as e: | |
signature.delay().get() | |
assert send_notification_mock.assert_called_once() | |
def test_should_run_a_chain_workflow(celery_app): | |
""" | |
CHAIN (serial tasks) | |
start --> task1 --> task2 | |
note: tasks2 receives result from task1 | |
""" | |
signature = ( | |
tasks.double_value.s(21) | | |
tasks.add_value.s(100) | |
) | |
result = signature.delay().get() | |
assert result == 142 | |
def test_should_run_a_group_workflow(celery_app): | |
""" | |
GROUP (paralell tasks) | |
--- task1 | |
/ | |
start --| --- task2 | |
\\ | |
--- task3 | |
""" | |
group_tasks = [] | |
for number in [10, 20, 30]: | |
group_tasks.append( | |
tasks.add_value.s(number, 42) | |
) | |
from celery import group | |
signature = group(group_tasks) | |
result = signature.delay().get() | |
assert result == [10 + 42, 20 + 42, 30 + 42] | |
def test_should_run_a_chord_workflow(celery_app): | |
""" | |
CHORD (put all together) | |
--- task1 --- | |
/ \\ | |
start --| --- task2 --- --- task4 ---> result | |
\\ / | |
--- task3 --- | |
""" | |
group_tasks = [] | |
for number in [10, 20, 30]: | |
group_tasks.append( | |
tasks.add_value.s(number, 42) | |
) | |
from celery import group | |
signature = ( | |
group(group_tasks) | | |
tasks.sum_values.s() | |
) | |
result = signature.delay().get() | |
assert result == 52 + 62 + 72 | |
def test_should_run_a_map_workflow(celery_app): | |
""" | |
MAP (put all together) | |
start --> task1(n1) --> task1(n2) --> task1(n) ... | |
""" | |
import random | |
random.seed(42) | |
numbers = random.sample(range(1,100), 10) | |
signature = tasks.is_even.map(numbers) | |
result = signature.delay().get() | |
assert result == [ | |
True, False, True, False, True, | |
True, False, True, True, False | |
] | |
def test_should_use_all_celery_components(celery_app): | |
""" | |
chain( task group callback | |
/--- double --- | |
add --| \\ -- sum | |
\\-- double --/ | |
""" | |
from celery import group | |
signature = ( | |
tasks.add_value.s(100, 50) | | |
group([ | |
tasks.double_value.s(), | |
tasks.double_value.s(), | |
]) | | |
tasks.sum_values.s() | |
) | |
result = signature.delay().get() | |
assert result == 300 + 300 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment