Created
July 15, 2013 22:57
-
-
Save florianleibert/6004234 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ShellExecutor() extends Executor { | |
//args|command. | |
// e.g. args: -av (async job), verbose mode | |
val executorService = Executors.newFixedThreadPool(20) | |
val log = Logger.getLogger(getClass.getName) | |
def registered(executorDriver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, | |
slaveInfo: SlaveInfo) { | |
println("Registered....") | |
} | |
def reregistered(p1: ExecutorDriver, p2: SlaveInfo) { | |
println("Reregistered...") | |
//Ignore | |
} | |
def disconnected(p1: ExecutorDriver) { | |
//Ignore | |
println("DISCONNECTED!") | |
} | |
def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo) { | |
try { | |
val status1: TaskStatus = TaskStatus.newBuilder.setTaskId(taskInfo.getTaskId) | |
.setState(TaskState.TASK_RUNNING).build() | |
driver.sendStatusUpdate(status1) | |
log.info("Running task: " + taskInfo.getTaskId) | |
val executorArgsPattern(flagString, command) = taskInfo.getData.toStringUtf8 | |
val flags = { | |
if (flagString.size > 0) { | |
flagString.split("\\s+") | |
} else { | |
new Array[String](0) | |
} | |
} | |
log.info("Flags received: " + flagString) | |
log.info("Command received: " +command) | |
val parser = new ArgotParser("internalParser") | |
val retriesFlag = parser.option[Int](List("r"), "n", "total retries, 0 by default") | |
parser.parse(flags) | |
val retries = retriesFlag.value.getOrElse(0) | |
//TODO(FL): Implement retries. | |
log.info("Retries flag:" + retries) | |
log.info("Task id: " + taskInfo.getTaskId.getValue) | |
log.info("Command:" + command) | |
val callback : (Int, TaskID) => Unit = { (x,y) => | |
val status2 = TaskStatus.newBuilder() | |
if (x == 0) { | |
status2.setTaskId(y).setState(TaskState.TASK_FINISHED) | |
} else { | |
status2.setTaskId(y).setState(TaskState.TASK_FAILED) | |
} | |
driver.sendStatusUpdate(status2.build()) | |
} | |
val task = new RunCommand(List(command), taskInfo.getTaskId, callback) | |
executorService.submit(task) | |
} catch { | |
case t: Throwable => log.warning("Caught exception:" + t.getMessage) | |
driver.sendStatusUpdate(TaskStatus.newBuilder() | |
.setTaskId(taskInfo.getTaskId()).setState(TaskState.TASK_FAILED).build()) | |
} | |
} | |
def killTask(p1: ExecutorDriver, p2: TaskID) { | |
//Not supported | |
println("KILL TASK") | |
} | |
def frameworkMessage(p1: ExecutorDriver, p2: Array[Byte]) { | |
//Not supported | |
println("FRAMEWORK MESSAGE!") | |
} | |
def shutdown(p1: ExecutorDriver) { | |
println("Shutdown!") | |
//No-op | |
System.exit(0) | |
} | |
def error(p1: ExecutorDriver, p2: String) { | |
println("Error!" + p2) | |
} | |
} | |
object ShellExecutor { | |
//TODO(FL): Fix this. | |
System.getProperties.setProperty("java.util.logging.SimpleFormatter.format", "[%1$tc] %4$s: %5$s%n") | |
val log = Logger.getLogger(getClass.getName) | |
def main(args: Array[String]) { | |
log.info("Starting shell-executor version: 0.2") | |
val conf = new Configuration(args) | |
val executor = new ShellExecutor() | |
val executorDriver = new MesosExecutorDriver(executor) | |
val status = executorDriver.run() | |
status match { | |
case Status.DRIVER_STOPPED => System.exit(0) | |
case _ => System.exit(1) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment