Created
March 8, 2018 12:55
-
-
Save pavloo/29f3bc1b755f79c14d673a5370de076c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _is_node_rdy(task, graph): | |
tasks = session.query(Task).filter(Task.id.in_(list(graph.predecessors(task.id)))).all() | |
for dep_task in tasks: | |
if not dep_task.celery_task_uid or \ | |
not AsyncResult(dep_task.celery_task_uid).state == SUCCESS: | |
return False | |
return True | |
@app.task(bind=True) | |
def run(self, workflow_id, cur_task_id=None): | |
print('Runnning Workflow {} and Task {}'.format(workflow_id, cur_task_id)) | |
workflow = session.query(Workflow).filter_by(id=workflow_id).one() | |
graph = workflow.execution_graph | |
next_task_ids = [] | |
if cur_task_id: | |
task = session.query(Task).get(cur_task_id) | |
if not _is_node_rdy(task, graph): | |
return | |
# do some business logic specific thing here | |
# if it's CI workflow, it would be eg. running tests | |
_process_task_node(task, self.request.id) | |
next_task_ids = list(graph.successors(cur_task_id)) | |
else: | |
next_task_ids = find_entry_point(graph) | |
# prematurely update task's status here, because we should have it | |
# when the next task is running, and since it's a recursive call | |
# to run the next task, the status might not be updated automatically | |
# when we run it | |
self.update_state(state=SUCCESS) | |
for task_id in next_task_ids: | |
run.apply_async( | |
args=(workflow_id, task_id,), | |
queue=QUEUE_NAME | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment