Created
November 17, 2018 02:27
-
-
Save pdtran3k6/1d68c7cab2e294b7c873ce0ef940a315 to your computer and use it in GitHub Desktop.
Refactor suggestion for testability and readability
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
public static void main(String[] args) { | |
long started = System.currentTimeMillis(); | |
boolean succeeded = runTonyAM(args); | |
long completed = System.currentTimeMillis(); | |
// By this time jobDir should have been set | |
HistoryFileUtils.createHistoryFile(fs, | |
TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir); | |
if (succeeded) { | |
LOG.info("Application Master completed successfully. exiting"); | |
System.exit(0); | |
} else { | |
LOG.info("Application Master failed. exiting"); | |
System.exit(-1); | |
} | |
} | |
public static boolean runTonyAM(String[] args) { | |
TonyApplicationMaster am = new TonyApplicationMaster(); | |
if (!am.init(args)) { | |
return false; | |
} | |
if (!am.prepare()) { | |
return false; | |
} | |
am.mainThread = Thread.currentThread(); | |
boolean succeeded; | |
do { | |
// Crash AM on purpose during AM crash tests. | |
String shouldCrash = System.getenv(Constants.TEST_AM_CRASH); | |
if (shouldCrash != null && shouldCrash.equals("true")) { | |
LOG.fatal("Error running TonyApplicationMaster !!"); | |
return false; | |
} | |
try { | |
am.start(); | |
} catch (Exception e) { | |
LOG.error("Exception when we're starting TonyAM", e); | |
return false; | |
} | |
succeeded = am.monitor(); | |
if (succeeded || am.amRetryCount == 0) { | |
LOG.info("Result: " + succeeded + ", retry count: " + am.amRetryCount); | |
break; | |
} | |
// Prepare for retryCount. | |
am.reset(); | |
LOG.info("Retrying, remaining retry count" + am.amRetryCount); | |
am.amRetryCount -= 1; | |
} while (!am.singleNode); // We don't retry on single node training. | |
// Wait for the worker nodes to finish (The interval between registering the exit code to final exit) | |
am.stop(); | |
am.printTaskUrls(); | |
return succeeded; | |
} | |
.... | |
/** | |
* Monitor the TensorFlow training job. | |
* @return if the tensorflow job finishes successfully. | |
*/ | |
private boolean monitor() { | |
... | |
while (true) { | |
/* | |
All of these will be checked in session.updateSessionStatus(), | |
so we can just break the loop without having | |
to set FinalStatus or return right away. | |
*/ | |
if (System.currentTimeMillis() > expireTime) { | |
LOG.error("Application times out."); | |
break; | |
} | |
// Check if client signals we should exit. | |
if (clientSignalToStop) { | |
LOG.info("Client signals AM to exit."); | |
break; | |
} | |
if (session.isTrainingFinished()) { | |
LOG.info("Training has finished."); | |
break; | |
} | |
if (preprocessExitCode != 0) { | |
LOG.info("Preprocess failed with exit code: " + preprocessExitCode); | |
break; | |
} | |
if (singleNode && preprocessFinished) { | |
LOG.info("Single node training finished with exit code: " + preprocessExitCode); | |
break; | |
} | |
/* | |
This has to be set in onTaskDeemedDead(TonyTask task) since | |
it won't be checked by session.updateSessionStatus() | |
*/ | |
if (this.taskHasMissesHB) { | |
LOG.info("Application failed due to missed heartbeats"); | |
break; | |
} | |
if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) { | |
// minor refactoring of logging num tasks finished. | |
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks); | |
break; | |
} | |
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks); | |
// Pause before refresh job status | |
try { | |
Thread.sleep(5000); | |
} catch (InterruptedException e) { | |
LOG.error("Thread interrupted", e); | |
} | |
} | |
session.updateSessionStatus(); // this will set FinalStatus accordingly | |
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks); | |
FinalApplicationStatus status = session.getFinalStatus(); | |
String appMessage = session.getFinalMessage(); | |
if (status != FinalApplicationStatus.SUCCEEDED) { | |
LOG.info("Tony session failed: " + appMessage); | |
} | |
return status == FinalApplicationStatus.SUCCEEDED; | |
} | |
.... | |
private void onTaskDeemedDead(TonyTask task) { | |
LOG.info("Task with id [" + task.getId() + "] has missed" | |
+ " [" + maxConsecutiveHBMiss + "] heartbeats.. Ending application !!"); | |
String msg = "Task with id [" + task.getId() + "] deemed dead!!"; | |
LOG.error(msg); | |
taskHasMissesHB = true; | |
session.setFinalStatus(FinalApplicationStatus.FAILED, msg); // set FinalStatus here as well | |
mainThread.interrupt(); | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
public void updateSessionStatus() { | |
int failureCount = 0; | |
if (getFinalStatus() == FinalApplicationStatus.FAILED) return; // short cirtcuit in case FinalStatus was already set | |
for (Map.Entry<String, TonyTask[]> entry : jobTasks.entrySet()) { | |
String jobName = entry.getKey(); | |
TonyTask[] tasks = entry.getValue(); | |
if (jobName.equals(PS_JOB_NAME)) { | |
// ignore PS job | |
continue; | |
} | |
... | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment