Created
June 16, 2017 10:22
-
-
Save jasonlai/80e072a3cb0b48f3d1876c4c56b2e6f8 to your computer and use it in GitHub Desktop.
Adhoc Mesos Executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python_library( | |
name='executor', | |
sources=rglobs('*.py'), | |
dependencies=[ | |
'3rdparty/python:mesos.executor', | |
'3rdparty/python:mesos.interface', | |
'3rdparty/python:pex', | |
'3rdparty/python:protobuf', | |
], | |
provides=setup_py( | |
name='executor', | |
version='0.0.1', | |
), | |
) | |
python_binary( | |
name='main', | |
entry_point='executor:main', | |
always_write_cache=True, | |
dependencies=[':executor'], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import sys | |
from threading import Thread | |
from time import sleep | |
from mesos.interface import Executor | |
from mesos.interface.mesos_pb2 import ( | |
DRIVER_STOPPED, | |
TASK_RUNNING, | |
TASK_FINISHED, | |
TaskID, | |
TaskStatus, | |
) | |
from mesos.executor import MesosExecutorDriver | |
class MyExecutor(Executor): | |
def _run_task(self, driver, task): | |
task_id = task.task_id | |
container = task.container | |
self.log('Running task %s' % task_id.value) | |
self.log('task.container: %s' % container) | |
self.log('task.container.mesos: %s' % bool(container.mesos)) | |
self.log('task.container.mesos.image: %s' % bool(container.mesos.image)) | |
while True: | |
self.log('Sending status update...') | |
status = TaskStatus( | |
task_id=TaskID(value=task.task_id.value), | |
state=TASK_RUNNING, | |
data='data with a \0 byte', | |
) | |
driver.sendStatusUpdate(status) | |
sleep(30) | |
status = TaskStatus( | |
task_id=TaskID(value=task.task_id.value), | |
state=TASK_FINISHED, | |
data='data with a \0 byte', | |
) | |
driver.sendStatusUpdate(status) | |
self.log('Sent status update') | |
def log(self, msg): | |
print(msg) | |
def registered(self, driver, executor_info, framework_info, slave_info): | |
self.log('registered() called with:') | |
self.log(' ExecutorInfo: %s' % executor_info) | |
self.log(' FrameworkInfo: %s' % framework_info) | |
self.log(' SlaveInfo: %s' % slave_info) | |
self._driver = driver | |
self._executor_info = executor_info | |
self._framework_info = framework_info | |
self._slave_info = slave_info | |
def reregistered(self, driver, slave_info): | |
self.log('reregistered() called with:') | |
self.log(' SlaveInfo: %s' % slave_info) | |
def disconnected(self, driver): | |
self.log('disconnected() called') | |
def launchTask(self, driver, task): | |
thread = Thread(target=self._run_task, args=(driver, task)) | |
thread.start() | |
def frameworkMessage(self, driver, message): | |
driver.sendFrameworkMessage(message) | |
def main(): | |
print('Starting executor....') | |
executor = MyExecutor() | |
driver = MesosExecutorDriver(executor) | |
if driver.run() != DRIVER_STOPPED: | |
return 1 | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment