DigDagでタスクが実行されるときの流れを少し調べたのでメモ 理解までは至ってないが、通知がretryされる部分に関しては見つけることができた
実行時に Starting a new session project
のメッセージをログとして出力している部分
logger.info("Starting a new session project id={} workflow name={} session_time={}",
projId, ar.getWorkflowName(), SESSION_TIME_FORMATTER.withZone(ar.getTimeZone()).format(ar.getSessionTime()));
StoredSessionAttemptWithSession storedAttemptWithSession =
StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt);
通知もnotifyというタスクぽいので、通常のタスクが実行されるのと変わらない流れをとおる
TaskResult result = callExecutor(projectPath, type, mergedRequest);
実際の通知の送信処理を呼ぶ部分、RetryExecutorを使って失敗した場合に指定の間隔を開けて再送信を行っている
@Override
public void sendNotification(Notification notification)
throws NotificationException
{
logger.debug("Notification: {}", notification);
if (sender == null) {
return;
}
RetryExecutor retryExecutor = retryExecutor()
.retryIf(exception -> true)
.withInitialRetryWait(minRetryWait)
.withMaxRetryWait(maxRetryWait)
.onRetry((exception, retryCount, retryLimit, retryWait) -> logger.warn("Sending notification failed: retry {} of {}", retryCount, retryLimit, exception))
.withRetryLimit(retries);
try {
retryExecutor.run(() -> {
try {
sender.sendNotification(notification);
}
catch (NotificationException e) {
throw Throwables.propagate(e);
}
});
}
catch (RetryExecutor.RetryGiveupException e) {
throw new NotificationException("Sending notification failed", e);
}
}
}
呼び出されている順番が非常にわかりやすい
タスクが失敗したときのログ↓OperatorManager.callExecutor
あたり気になるな
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:312)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:254)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.ExtractArchiveWorkspaceManager.withExtractedArchive(ExtractArchiveWorkspaceManager.java:77)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
catch (RuntimeException | AssertionError ex) { // Avoid infinite task retry cause of AssertionError by Operators
if (ex instanceof ConfigException) {
logger.error("Configuration error at task {}: {}", request.getTaskName(), formatExceptionMessage(ex));
}
else {
logger.error("Task failed with unexpected error: {}", ex.getMessage(), ex);
}
callback.taskFailed(request.getSiteId(),
request.getTaskId(), request.getLockId(), agentId,
buildExceptionErrorConfig(ex).toConfig(cf)); // no retry
}
return true;
});
}
catch (RuntimeException | IOException ex) {
// exception happened in workspaceManager
logger.error("Task failed with unexpected error: {}", ex.getMessage(), ex);
callback.taskFailed(request.getSiteId(),
request.getTaskId(), request.getLockId(), agentId,
buildExceptionErrorConfig(ex).toConfig(cf));
失敗したときに、taskFailedがよばれるようになっている。
@Override
public void taskFailed(int siteId,
long taskId, String lockId, AgentId agentId,
Config error)
{
tm.begin(() -> exec.taskFailed(siteId, taskId, lockId, agentId, error));
}
execは WorkflowExecutor
⭐retry failedするとルートからの実行ではないので、ココがtrueにならない気がする
if (isRootTask) {
errorTaskIds.add(addAttemptFailureAlertTask(lockedTask));
}
private long addAttemptFailureAlertTask(TaskControl rootTask)
{
Config config = cf.create();
config.set("_type", "notify");
config.set("_command", "Workflow session attempt failed");
WorkflowTaskList tasks = compiler.compileTasks(rootTask.get().getFullName(), "^failure-alert", config);
return rootTask.addGeneratedSubtasksWithoutLimit(tasks, ImmutableList.of(), false);
}