Last active
August 29, 2015 14:20
-
-
Save jbohren/9cd689c08782d6ffbcdf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import glob | |
import sys | |
import threading | |
import time | |
import traceback | |
try: | |
# Python3 | |
from queue import Queue | |
from queue import Empty | |
except ImportError: | |
# Python2 | |
from Queue import Queue | |
from Queue import Empty | |
try: | |
# Python3 | |
import asyncio | |
except ImportError: | |
# Python2 | |
import trollius as asyncio | |
from StringIO import StringIO | |
from concurrent.futures import ThreadPoolExecutor | |
from itertools import tee | |
from osrf_pycommon.process_utils import async_execute_process | |
from osrf_pycommon.process_utils import AsyncSubprocessProtocol | |
from osrf_pycommon.process_utils import get_loop | |
from catkin_tools.common import format_time_delta | |
from catkin_tools.common import format_time_delta_short | |
from catkin_tools.common import log | |
from catkin_tools.common import wide_log | |
from jobserver import JobServer | |
from catkin_tools.terminal_color import ansi | |
from catkin_tools.terminal_color import fmt | |
from catkin_tools.terminal_color import sanitize | |
from catkin_tools.terminal_color import ColorMapper | |
# This map translates more human reable format strings into colorized versions | |
_color_translation_map = { | |
# 'output': 'colorized_output' | |
'': fmt('@!' + sanitize('') + '@|'), | |
# Job starting | |
"Starting >>> {:<{}}": | |
fmt( "Starting @!@{gf}>>>@| @!@{cf}{:<{}}@|"), | |
# Job finishing | |
"Finished <<< {:<{}} [ {} ]": | |
fmt("@!@{kf}Finished@| @{gf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"), | |
"Failed <<< {:<{}} [ {} ]": | |
fmt("@!@{rf}Failed@| @{rf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"), | |
# Job abandoning | |
"Abandoned <<< {:<{}} [ {} ]": | |
fmt("@!@{rf}Abandoned@| @{rf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"), | |
"Depends on failed job {}": | |
fmt("@{yf}Depends on failed job @!{}@|"), | |
"Depends on failed job {} via {}": | |
fmt("@{yf}Depends on failed job @!{}@| @{yf}via @!{}@|"), | |
# Stage finishing | |
"{}:{}": | |
fmt("@{cf}{}@|:@{bf}{}@|"), | |
"Failed <<< {}:{:<{}} [ Exited with code {} ]": | |
fmt("@!@{rf}Failed@| @{rf}<<<@| @{cf}{}@|:@{bf}{:<{}}@|[ @{yf}Exited code @!@{yf}{}@| ]"), | |
"Warnings <<< {}:{}": | |
fmt("@!@{yf}Warnings@| @{yf}<<<@| @{cf}{}@|:@{bf}{}@|"), | |
"Errors <<< {}:{}": | |
fmt("@!@{rf}Errors@| @{rf}<<<@| @{cf}{}@|:@{bf}{}@|"), | |
} | |
color_mapper = ColorMapper(_color_translation_map) | |
clr = color_mapper.clr | |
def colorize_cmake(line): | |
"""Colorizes output from CMake | |
:param line: one, new line terminated, line from `cmake` which needs coloring. | |
:type line: str | |
""" | |
cline = sanitize(line) | |
if line.startswith('-- '): | |
cline = '@{cf}-- @|' + cline[len('-- '):] | |
if ':' in cline: | |
split_cline = cline.split(':') | |
cline = split_cline[0] + ':@{yf}' + ':'.join(split_cline[1:]) + '@|' | |
if line.lower().startswith('warning'): | |
# WARNING | |
cline = fmt('@{yf}') + cline | |
if line.startswith('CMake Warning'): | |
# CMake Warning... | |
cline = cline.replace('CMake Warning', '@{yf}@!CMake Warning@|') | |
if line.startswith('ERROR:'): | |
# ERROR: | |
cline = cline.replace('ERROR:', '@!@{rf}ERROR:@|') | |
if line.startswith('CMake Error'): | |
# CMake Error... | |
cline = cline.replace('CMake Error', '@{rf}@!CMake Error@|') | |
if line.startswith('Call Stack (most recent call first):'): | |
# CMake Call Stack | |
cline = cline.replace('Call Stack (most recent call first):', | |
'@{cf}@_Call Stack (most recent call first):@|') | |
return fmt(cline) | |
def split(values, cond): | |
"""Split an iterable based on a condition.""" | |
head, tail = tee((cond(v), v) for v in values) | |
return [v for c, v in head if c], [v for c, v in tail if not c] | |
class ExecutorEvent(object): | |
"""Structure for events generated by the Executor. | |
Events can be jobs starting/finishing, commands starting/failing/finishing, | |
commands producing output (each line is an event), or when the executor | |
quits or failes. | |
""" | |
# TODO: Make this a map of ID -> fields | |
EVENT_IDS = [ | |
'JOB_STATUS', # All jobs have finished | |
'QUEUED_JOB', # A job has been queued to be executed | |
'STARTED_JOB', # A job has started to be executed | |
'FINISHED_JOB', # A job has finished executing (succeeded or failed) | |
'ABANDONED_JOB', # A job has been abandoned for some reason | |
'STARTED_STAGE', # A job stage has started to be executed | |
'FINISHED_STAGE', # A job stage has finished executing (succeeded or failed) | |
'STAGE_PROGRESS', # A job stage has executed partially | |
'STDOUT', # A status message from a job | |
'STDERR' # A warning or error message from a job | |
] | |
def __init__(self, event_id, **kwargs): | |
"""Create a new event. | |
:param event_id: One of the valid EVENT_IDS | |
:param **kwargs: The additional data to be passed along with this event. | |
""" | |
# Store the time this event was generated | |
self.time = time.time() | |
# Make sure the event ID is valid | |
if event_id not in ExecutorEvent.EVENT_IDS: | |
print(ExecutorEvent.EVENT_IDS) | |
raise ValueError("The event ID %s is not a valid executor event." % event_id) | |
# Store the event data | |
self.event_id = event_id | |
self.data = kwargs | |
class IOBufferContainer(object): | |
"""A simple buffer container for use in logging.""" | |
def __init__(self): | |
self.stdout_buffer = b"" | |
self.stderr_buffer = b"" | |
self.interleaved_buffer = b"" | |
class IOBufferLogger(IOBufferContainer): | |
"""This is a logging class to be used instead of sys.stdout and sys.stderr | |
in FunStage operations. | |
This class also generates `stdout` and `stderr` events. | |
""" | |
def __init__(self, job_id, label, event_queue): | |
IOBufferContainer.__init__(self) | |
self.job_id = job_id | |
self.label = label | |
self.event_queue = event_queue | |
def out(self, data): | |
self.stdout_buffer += data + '\n' | |
self.interleaved_buffer += data + '\n' | |
self.event_queue.put(ExecutorEvent( | |
'STDOUT', | |
job_id=self.job_id, | |
label=self.label, | |
data=data)) | |
def err(self, data): | |
self.stderr_buffer += data + '\n' | |
self.interleaved_buffer += data + '\n' | |
self.event_queue.put(ExecutorEvent( | |
'STDERR', | |
job_id=self.job_id, | |
label=self.label, | |
data=data)) | |
class IOBufferProtocol(IOBufferContainer, AsyncSubprocessProtocol): | |
"""An asyncio protocol that collects stdout and stderr. | |
This class also generates `stdout` and `stderr` events. | |
Since the underlying asyncio API constructs the actual protocols, this | |
class provides a factory method to inject the job and stage information | |
into the created protocol. | |
""" | |
def __init__(self, job_id, label, event_queue, *args, **kwargs): | |
IOBufferContainer.__init__(self) | |
AsyncSubprocessProtocol.__init__(self, *args, **kwargs) | |
self.job_id = job_id | |
self.label = label | |
self.event_queue = event_queue | |
@staticmethod | |
def factory(job_id, label, event_queue): | |
"""Factory method for constructing with job metadata.""" | |
def init_proxy(*args, **kwargs): | |
return IOBufferProtocol(job_id, label, event_queue, *args, **kwargs) | |
return init_proxy | |
def on_stdout_received(self, data): | |
self.stdout_buffer += data | |
self.interleaved_buffer += data | |
self.event_queue.put(ExecutorEvent( | |
'STDOUT', | |
job_id=self.job_id, | |
label=self.label, | |
data=data)) | |
def on_stderr_received(self, data): | |
self.stderr_buffer += data | |
self.interleaved_buffer += data | |
self.event_queue.put(ExecutorEvent( | |
'STDERR', | |
job_id=self.job_id, | |
label=self.label, | |
data=data)) | |
@asyncio.coroutine | |
def async_job(job, executor, event_queue): | |
"""Run a sequence of Stages from a Job and collect their output. | |
:param job: A Job instance | |
:executor: A thread pool executor for blocking stages | |
:event_queue: A queue for asynchronous events | |
""" | |
# Initialize success flag | |
all_stages_succeeded = True | |
# Execute each stage of this job | |
for stage in job.stages: | |
# Abort the job if one of the stages has failed | |
if job.continue_on_failure and not all_stages_succeeded: | |
break | |
# Notify stage started | |
event_queue.put(ExecutorEvent( | |
'STARTED_STAGE', | |
job_id=job.jid, | |
label=stage.label)) | |
if type(stage) is CmdStage: | |
# Initiate the command | |
transport, logger = yield asyncio.From( | |
async_execute_process( | |
stage.protocol.factory(job.jid, stage.label, event_queue), | |
cmd=stage.command, | |
**stage.kwargs)) | |
# Asynchronously yield until this command is completed | |
retcode = yield asyncio.From(logger.complete) | |
elif type(stage) is FunStage: | |
# Create logger to be used instead of using stdout / stderr | |
logger = IOBufferLogger(job.jid, stage.label, event_queue) | |
try: | |
# Asynchronously yield until this function is completed | |
retcode = yield asyncio.From(get_loop().run_in_executor(executor, stage.function, logger, event_queue)) | |
except: | |
logger.err(str(traceback.format_exc())) | |
retcode = 1 | |
# Set whether this stage succeeded | |
stage_succeeded = (retcode == 0) | |
# Update success tracker from this stage | |
all_stages_succeeded = all_stages_succeeded and stage_succeeded | |
# Store the results from this stage | |
event_queue.put(ExecutorEvent( | |
'FINISHED_STAGE', | |
job_id=job.jid, | |
label=stage.label, | |
succeeded=stage_succeeded, | |
stdout=logger.stdout_buffer, | |
stderr=logger.stderr_buffer, | |
interleaved=logger.interleaved_buffer, | |
retcode=retcode)) | |
# Finally, return whether all stages of the job completed | |
raise asyncio.Return(job.jid, all_stages_succeeded) | |
@asyncio.coroutine | |
def process_jobs( | |
jobs, | |
event_queue, | |
continue_on_failure=False, | |
continue_without_deps=False): | |
"""Process a number of jobs asynchronously. | |
:param jobs: A list of Jobs | |
:param event_queue: A python queue for reporting events. | |
:param continue_on_failure: Keep running jobs even if one fails. | |
:param continue_without_deps: Run jobs even if their dependencies fail. | |
""" | |
# Initialize list of ready and pending jobs (jobs not ready to be executed) | |
queued_jobs, pending_jobs = split(jobs, lambda j: len(j.deps) == 0) | |
# List of active jobs job_id -> future | |
active_jobs = {} | |
# Dict of completd jobs job_id -> succeeded | |
completed_jobs = {} | |
# List of jobs whose deps failed | |
abandoned_jobs = [] | |
# Create a thread pool executor for blocking python stages in the asynchronous jobs | |
executor = ThreadPoolExecutor(max_workers=JobServer.max_jobs()) | |
# Process all jobs asynchronously until there are none left | |
while len(active_jobs) + len(queued_jobs) + len(pending_jobs) > 0: | |
# Get a token genej.jidator from the job server | |
token_generator = JobServer.try_acquire() | |
# Activate jobs while the jobserver dispenses tokens | |
while len(queued_jobs) > 0 and token_generator.next() is not None: | |
# Pop a job off of the job queue | |
job = queued_jobs.pop(0) | |
event_queue.put(ExecutorEvent( | |
'STARTED_JOB', | |
job_id=job.jid)) | |
# Start the job coroutine | |
active_jobs[job.jid] = async_job(job, executor, event_queue) | |
# Process jobs as they complete asynchronously | |
for job_completed in asyncio.as_completed(list(active_jobs.values())): | |
# Report running jobs | |
event_queue.put(ExecutorEvent( | |
'JOB_STATUS', | |
active=list(active_jobs.keys()), | |
queued=[j.jid for j in queued_jobs], | |
pending=[j.jid for j in pending_jobs], | |
abandoned=[j.jid for j in abandoned_jobs])) | |
# Capture a result once the job has finished | |
job_id, succeeded = yield asyncio.From(job_completed) | |
# Release a jobserver token now that this job has succeeded | |
JobServer.release() | |
# Generate event with the results of this job | |
event_queue.put(ExecutorEvent( | |
'FINISHED_JOB', | |
job_id=job_id, | |
succeeded=succeeded)) | |
# Remove the job from the active jobs dict | |
del active_jobs[job_id] | |
# Add the job to the completed list | |
completed_jobs[job_id] = succeeded | |
# Handle failure modes | |
if not succeeded: | |
# By default don't abandon any other jobs because this job failed | |
new_abandoned_jobs = [] | |
# Handle different abandoning policies | |
if not continue_on_failure: | |
# Abort all pending jobs if any job fails | |
new_abandoned_jobs = queued_jobs + pending_jobs | |
queued_jobs = [] | |
pending_jobs = [] | |
# Notify that jobs have been abandoned | |
for abandoned_job in new_abandoned_jobs: | |
event_queue.put(ExecutorEvent( | |
'ABANDONED_JOB', | |
job_id=abandoned_job.jid, | |
reason='PEER_FAILED', | |
peer_job_id=job_id)) | |
elif not continue_without_deps: | |
unhandled_abandoned_job_ids = [job_id] | |
# Abandon jobs which depend on abandoned jobs | |
while len(unhandled_abandoned_job_ids) > 0: | |
# Get the abandoned job | |
abandoned_job_id = unhandled_abandoned_job_ids.pop(0) | |
# Abandon all pending jobs which depend on this job_id | |
unhandled_abandoned_jobs, pending_jobs = split( | |
pending_jobs, | |
lambda j: abandoned_job_id in j.deps) | |
# Handle each new abandoned job | |
for abandoned_job in unhandled_abandoned_jobs: | |
new_abandoned_jobs.append(abandoned_job) | |
# Notify if any jobs have been abandoned | |
event_queue.put(ExecutorEvent( | |
'ABANDONED_JOB', | |
job_id=abandoned_job.jid, | |
reason='DEP_FAILED', | |
direct_dep_job_id=abandoned_job_id, | |
dep_job_id=job_id)) | |
# Add additional job ids to check | |
unhandled_abandoned_job_ids.extend([j.jid for j in unhandled_abandoned_jobs]) | |
# Update the abandoned jobs | |
abandoned_jobs.extend(new_abandoned_jobs) | |
# Update the list of ready jobs (based on completed job dependencies) | |
new_queued_jobs, pending_jobs = split( | |
pending_jobs, | |
lambda j: j.all_deps_completed(completed_jobs)) | |
queued_jobs.extend(new_queued_jobs) | |
# Notify of newly queued jobs | |
for queued_job in new_queued_jobs: | |
event_queue.put(ExecutorEvent( | |
'QUEUED_JOB', | |
job_id=queued_job.jid)) | |
# Report running jobs | |
event_queue.put(ExecutorEvent( | |
'JOB_STATUS', | |
active=list(active_jobs.keys()), | |
queued=[j.jid for j in queued_jobs], | |
pending=[j.jid for j in pending_jobs], | |
abandoned=[j.jid for j in abandoned_jobs])) | |
class Job(object): | |
"""A Job is a series of operations, each of which is considered a "stage" of the job.""" | |
def __init__(self, jid, deps, stages, continue_on_failure=True): | |
self.jid = jid | |
self.deps = deps | |
self.stages = stages | |
self.continue_on_failure = continue_on_failure | |
def all_deps_completed(self, completed_jobs): | |
"""Return True if all dependencies have been completed.""" | |
return all([dep_id in completed_jobs for dep_id in self.deps]) | |
def all_deps_succeeded(self, completed_jobs): | |
"""Return True if all dependencies have been completed and succeeded.""" | |
return all([completed_jobs.get(dep_id, False) for dep_id in self.deps]) | |
def any_deps_failed(self, completed_jobs): | |
"""Return True if any dependencies which have been completed have failed.""" | |
return any([not completed_jobs.get(dep_id, True) for dep_id in self.deps]) | |
class Stage(object): | |
"""A description of one of the serially-executed stages of a Job. | |
Like Jobs, Stages are stateless, and simply describe what needs to be done | |
and how to do it. | |
""" | |
def __init__(self, label): | |
self.label = label or str(label) | |
class CmdStage(Stage): | |
"""Job stage that describes a system command. | |
:param label: The label for the stage | |
:param command: A list of strings composing a system command | |
:param protocol: A protocol class to use for this stage | |
Additional kwargs are passed to `async_execute_process` | |
""" | |
def __init__(self, label, command, protocol=None, **kwargs): | |
if not type(command) in [list, tuple] or not all([type(s) is str for s in command]): | |
raise ValueError('Command stage must be a list of strings: {}'.format(command)) | |
super(CmdStage, self).__init__(label) | |
self.command = command | |
self.protocol = protocol or IOBufferProtocol | |
self.kwargs = kwargs | |
# Emulate tty for cli colors | |
self.kwargs.setdefault('emulate_tty', True) | |
# Capture stderr and stdout separately | |
self.kwargs.setdefault('stderr_to_stdout', False) | |
class FunStage(Stage): | |
"""Job stage that describes a python function. | |
:param label: The label for the stage | |
:param function: A python function which returns 0 on success | |
""" | |
def __init__(self, label, function): | |
if not callable(function): | |
raise ValueError('Function stage must be callable.') | |
super(FunStage, self).__init__(label) | |
self.function = function | |
def foo(logger, event_queue): | |
"""Simple blocking function that sleeps for 1 second and then raises an exception""" | |
logger.out('Fooing...') | |
time.sleep(1.0) | |
logger.err('Can\'t foo!') | |
raise RuntimeError() | |
logger.out('Done fooing.') | |
return 0 | |
def bar(logger, event_queue): | |
"""Simple blocking function that sleeps for 0.25 seconds""" | |
logger.out('Baring...') | |
time.sleep(0.25) | |
logger.out('Done baring.') | |
return 0 | |
def baz(logger, event_queue): | |
"""Simple blocking function that sleeps for 1 second and then writes to err""" | |
logger.out('Bazing...') | |
time.sleep(1.0) | |
logger.err('You shouldn\'t be bazing!') | |
logger.err('You should be baring instead.') | |
return 0 | |
class ConsoleStatusThread(threading.Thread): | |
"""Status thread for displaying events to the console.""" | |
def __init__( | |
self, | |
label, | |
jobs, | |
event_queue, | |
show_stage_events=False, | |
show_buffered_stdout=False, | |
show_buffered_stderr=True, | |
show_live_stdout=False, | |
show_live_stderr=False, | |
show_active_status=True, | |
active_status_rate=10.0): | |
""" | |
:param label: The label for this task (build, clean, etc) | |
:param event_queue: The event queue used by an Executor | |
:param show_stage_events: Show events relating to stages in each job | |
:param show_buffered_stdout: Show stdout from jobs as they finish | |
:param show_buffered_stderr: Show stderr from jobs as they finish | |
:param show_live_stdout: Show stdout lines from jobs as they're generated | |
:param show_live_stderr: Show stdout lines from jobs as they're generated | |
:param show_active_status: Periodically show a status line displaying the active jobs | |
:param active_status_rate: The rate in Hz at which the status line should be printed | |
""" | |
super(ConsoleStatusThread, self).__init__() | |
self.label = label | |
self.event_queue = event_queue | |
self.show_stage_events = show_stage_events | |
self.show_buffered_stdout = show_buffered_stdout | |
self.show_buffered_stderr = show_buffered_stderr | |
self.show_live_stdout = show_live_stdout | |
self.show_live_stderr = show_live_stderr | |
self.show_active_status = show_active_status | |
self.active_status_rate = max(active_status_rate, 0.1) | |
self.jobs = dict([(j.jid, j) for j in jobs]) | |
self.max_jid_length = max([len(jid)+max([len(s.label) for s in job.stages]) for jid, job in self.jobs.items()]) | |
def run(self): | |
active_jobs = [] | |
queued_jobs = [] | |
pending_jobs = [] | |
start_times = dict() | |
end_times = dict() | |
start_time = time.time() | |
while True: | |
if self.show_active_status: | |
# Try to get an event from the queue (non-blocking) | |
try: | |
res = self.event_queue.get(False) | |
except Empty: | |
# Print live status (overwrites last line) | |
wide_log('[{} {} s] [{}/{} jobs] {}'.format( | |
self.label, | |
format_time_delta_short(time.time() - start_time), | |
JobServer.running_jobs(), | |
JobServer.max_jobs(), | |
active_jobs), end='\r') | |
sys.stdout.flush() | |
time.sleep(1.0 / self.active_status_rate) | |
continue | |
else: | |
# Try to get an event from the queue (blocking) | |
try: | |
res = self.event_queue.get(True) | |
except Empty: | |
break | |
# A `None` event is a signal to terminate | |
if res is None: | |
break | |
# Handle the received events | |
eid = res.event_id | |
if 'JOB_STATUS' == eid: | |
# Check if all jobs have finished in some way | |
if all([len(res.data[t]) == 0 for t in ['active', 'queued', 'pending']]): | |
break | |
active_jobs = res.data['active'] | |
queued_jobs = res.data['queued'] | |
pending_jobs = res.data['pending'] | |
elif 'STARTED_JOB' == eid: | |
wide_log(clr('Starting >>> {:<{}}').format( | |
res.data['job_id'], | |
self.max_jid_length)) | |
start_times[res.data['job_id']] = res.time | |
elif 'FINISHED_JOB' == eid: | |
end_times[res.data['job_id']] = res.time | |
duration = format_time_delta(end_times[res.data['job_id']] - start_times[res.data['job_id']]) | |
if res.data['succeeded']: | |
wide_log(clr('Finished <<< {:<{}} [ {} ]').format( | |
res.data['job_id'], | |
self.max_jid_length, | |
duration)) | |
else: | |
wide_log(clr('Failed <<< {:<{}} [ {} ]').format( | |
res.data['job_id'], | |
self.max_jid_length, | |
duration)) | |
elif 'ABANDONED_JOB' == eid: | |
if 'DEP_FAILED' == res.data['reason']: | |
direct = res.data['dep_job_id'] == res.data['direct_dep_job_id'] | |
if direct: | |
reason = clr('Depends on failed job {}').format(res.data['dep_job_id']) | |
else: | |
reason = clr('Depends on failed job {} via {}').format( | |
res.data['dep_job_id'], | |
res.data['direct_dep_job_id']) | |
elif 'PEER_FAILED' == res.data['reason']: | |
reason = clr('Unrelated job failed') | |
wide_log(clr('Abandoned <<< {:<{}} [ {} ]').format( | |
res.data['job_id'], | |
self.max_jid_length, | |
reason)) | |
elif 'STARTED_STAGE' == eid: | |
if self.show_stage_events: | |
wide_log('Starting >>> {}:{}'.format( | |
res.data['job_id'], | |
res.data['label'])) | |
elif 'FINISHED_STAGE' == eid: | |
if self.show_buffered_stderr and len(res.data['stderr']) > 0: | |
if res.data['succeeded']: | |
wide_log(clr('Warnings <<< {}:{}').format( | |
res.data['job_id'], | |
res.data['label'])) | |
else: | |
wide_log(clr('Errors <<< {}:{}').format( | |
res.data['job_id'], | |
res.data['label'])) | |
prefix_color = '@!@{yf}' if res.data['succeeded'] else '@!@{rf}' | |
prefix = clr(prefix_color + '> @|') | |
for line in res.data['stderr'].splitlines(): | |
wide_log(prefix + line) | |
if res.data['succeeded']: | |
if self.show_stage_events: | |
wide_log('Finished <<< {}:{}'.format( | |
res.data['job_id'], | |
res.data['label'])) | |
else: | |
wide_log(clr('Failed <<< {}:{:<{}} [ Exited with code {} ]').format( | |
res.data['job_id'], | |
res.data['label'], | |
max(0,self.max_jid_length - len(res.data['job_id'])), | |
res.data['retcode'])) | |
elif 'STDERR' == eid: | |
if self.show_live_stderr: | |
prefix = '{} [{}]: '.format( | |
res.data['job_id'], | |
res.data['label']) | |
wide_log('\n'.join(prefix + l for l in res.data['data'].splitlines())) | |
if __name__ == '__main__': | |
# Initialize jobserver | |
JobServer.initialize(max_jobs=5) | |
# Get event loop | |
loop = get_loop() | |
# Queue for communicating status | |
event_queue = Queue() | |
# The job list is a list of pairs (ID, JOB) where the job is composed of | |
# multiple stages which are run sequentially. Jobs can specify their | |
# dependencies in terms of the ids of other jobs. | |
jobs = [ | |
Job('job_a', deps=[], stages=[CmdStage('grep', ['grep', 'hosts'] + glob.glob('/etc/*'))]), | |
Job('job_b', deps=[], stages=[CmdStage('sleep', ['sleep', '2'])]), | |
Job('job_c', deps=['job_b'], | |
stages=[ | |
CmdStage('sleep', ['sleep', '1']), | |
CmdStage('sleep', ['sleep', '1'])] | |
), | |
Job('job_d1', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.1'])]), | |
Job('job_d2', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.2'])]), | |
Job('job_d3', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.3'])]), | |
Job('job_e', deps=[], stages=[FunStage('foo', foo)]), | |
Job('job_f', deps=['job_e'], stages=[FunStage('foo', foo)]), | |
Job('job_g', deps=['job_b', 'job_c'], | |
stages=[ | |
FunStage('bar', bar), | |
CmdStage('sleep', ['sleep', '0.5']), | |
FunStage('bar', bar)] | |
), | |
Job('job_h', deps=['job_b'], stages=[FunStage('baz', baz)]), | |
Job('job_i', deps=['job_f'], stages=[FunStage('baz', baz)]), | |
] | |
# TODO: Check job id uniqueness | |
# TODO: Check dependency existence | |
try: | |
# Spin up status output thread | |
status_thread = ConsoleStatusThread('exec', jobs, event_queue) | |
status_thread.start() | |
# Block while running N jobs asynchronously | |
loop.run_until_complete(process_jobs( | |
jobs, | |
event_queue, | |
continue_on_failure=True, | |
continue_without_deps=False)) | |
except KeyboardInterrupt: | |
print("Interrupted!") | |
event_queue.put(None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2014 Open Source Robotics Foundation, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import print_function | |
from multiprocessing import cpu_count | |
from tempfile import mkstemp | |
from termios import FIONREAD | |
import array | |
import errno | |
import fcntl | |
import os | |
import re | |
import subprocess | |
import time | |
from catkin_tools.common import log | |
from catkin_tools.common import version_tuple | |
JOBSERVER_SUPPORT_MAKEFILE = b''' | |
all: | |
\techo $(MAKEFLAGS) | grep -- '--jobserver-fds' | |
''' | |
def memory_usage(): | |
""" | |
Get used and total memory usage. | |
:returns: Used and total memory in bytes | |
:rtype: tuple | |
""" | |
# Handle optional psutil support | |
try: | |
import psutil | |
psutil_version = version_tuple(psutil.__version__) | |
if psutil_version < (0, 6, 0): | |
usage = psutil.phymem_usage() | |
used = usage.used | |
else: | |
usage = psutil.virtual_memory() | |
used = usage.total - usage.available | |
return used, usage.total | |
except ImportError: | |
pass | |
return None, None | |
class JobServer: | |
""" | |
This class implements a GNU make-compatible job server. | |
""" | |
# Singleton jobserver | |
_singleton = None | |
# Flag designating whether the `make` program supports the GNU Make | |
# jobserver interface | |
_gnu_make_supported = False | |
def __init__(self, max_jobs=None, max_load=None, max_mem=None): | |
""" | |
:param max_jobs: the maximum number of jobs available | |
:param max_load: do not dispatch additional jobs if this system load | |
value is exceeded | |
:param max_mem: do not dispatch additional jobs if system physical | |
memory usage exceeds this value (see _set_max_mem for additional | |
documentation) | |
""" | |
assert(JobServer._singleton is None) | |
if not max_jobs: | |
try: | |
max_jobs = cpu_count() | |
except NotImplementedError: | |
log('@{yf}WARNING: Failed to determine the cpu_count, falling back to 1 jobs as the default.@|') | |
max_jobs = 1 | |
else: | |
max_jobs = int(max_jobs) | |
self.max_jobs = max_jobs | |
self.max_load = max_load | |
self._set_max_mem(max_mem) | |
self.job_pipe = os.pipe() | |
# Initialize the pipe with max_jobs tokens | |
for i in range(max_jobs): | |
os.write(self.job_pipe[1], b'+') | |
@staticmethod | |
def _test_gnu_make_support(): | |
""" | |
Test if the system 'make' supports the job server implementation. | |
""" | |
fd, makefile = mkstemp() | |
os.write(fd, JOBSERVER_SUPPORT_MAKEFILE) | |
os.close(fd) | |
ret = subprocess.call(['make', '-f', makefile, '-j2'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
os.unlink(makefile) | |
return (ret == 0) | |
def _set_max_mem(self, max_mem): | |
""" | |
Set the maximum memory to keep instantiating jobs. | |
:param max_mem: String describing the maximum memory that can be used | |
on the system. It can either describe memory percentage or absolute | |
amount. Use 'P%' for percentage or 'N' for absolute value in bytes, | |
'Nk' for kilobytes, 'Nm' for megabytes, and 'Ng' for gigabytes. | |
:type max_mem: str | |
""" | |
if max_mem is None: | |
self.max_mem = None | |
return | |
elif type(max_mem) is float or type(max_mem) is int: | |
mem_percent = max_mem | |
elif type(max_mem) is str: | |
m_percent = re.search('([0-9]+)\%', max_mem) | |
m_abs = re.search('([0-9]+)([kKmMgG]{0,1})', max_mem) | |
if m_percent is None and m_abs is None: | |
self.max_mem = None | |
return | |
if m_percent: | |
mem_percent = m_abs.group(1) | |
elif m_abs: | |
val = float(m_abs.group(1)) | |
mag_symbol = m_abs.group(2) | |
_, total_mem = memory_usage() | |
if mag_symbol == '': | |
mag = 1.0 | |
elif mag_symbol.lower() == 'k': | |
mag = 1024.0 | |
elif mag_symbol.lower() == 'm': | |
mag = pow(1024.0, 2) | |
elif mag_symbol.lower() == 'g': | |
mag = pow(1024.0, 3) | |
mem_percent = 100.0 * val * mag / total_mem | |
self.max_mem = max(0.0, min(100.0, float(mem_percent))) | |
def _load_ok(self): | |
if self.max_load is not None: | |
try: | |
load = os.getloadavg() | |
if jobserver_running_jobs() > 0 and load[1] > self.max_load: | |
return False | |
except NotImplementedError: | |
return True | |
return True | |
def _mem_ok(self): | |
if self.max_mem is not None: | |
mem_used, mem_total = memory_usage() | |
mem_percent_used = 100.0 * float(mem_used) / float(mem_total) | |
if jobserver_running_jobs() > 0 and mem_percent_used > self.max_mem: | |
return False | |
return True | |
def _acquire(self): | |
""" | |
Obtain a job server token. Be sure to call _release() to avoid | |
deadlocks. | |
""" | |
try: | |
# read a token from the job pipe | |
token = os.read(self.job_pipe[0], 1) | |
return token | |
except OSError as e: | |
if e.errno != errno.EINTR: | |
raise | |
return None | |
def _wait_acquire(self): | |
""" | |
Wait until a job server token can be acquired. | |
""" | |
token = None | |
while token is None: | |
# make sure we're observing load and memory maximums | |
if not self._load_ok() or not self._mem_ok(): | |
time.sleep(0.01) | |
continue | |
# try to get a job token | |
token = self._acquire() | |
return token | |
def _try_acquire(self): | |
""" | |
Try to obtain a job server tokens. | |
""" | |
while True: | |
# make sure we're observing load and memory maximums | |
if self._load_ok() and self._mem_ok(): | |
# try to get a job token | |
token = self._acquire() | |
yield token | |
else: | |
yield None | |
def _release(self): | |
""" | |
Write a token to the job pipe. | |
""" | |
os.write(self.job_pipe[1], b'+') | |
@classmethod | |
def initialize(cls, *args, **kwargs): | |
""" | |
Initialize the global GNU Make jobserver. | |
:param max_jobs: the maximum number of jobs available | |
:param max_load: do not dispatch additional jobs if this system load | |
value is exceeded | |
:param max_mem: do not dispatch additional jobs if system physical | |
memory usage exceeds this value | |
""" | |
# Only initialize once | |
assert(cls._singleton is None) | |
# Check if the jobserver is supported | |
cls._gnu_make_supported = cls._test_gnu_make_support() | |
if not cls._gnu_make_supported: | |
log('@{yf}WARNING: Make job server not supported. The number of Make ' | |
'jobs may exceed the number of CPU cores.@|') | |
return | |
# Create the jobserver singleton | |
cls._singleton = JobServer(*args, **kwargs) | |
@classmethod | |
def set_max_mem(cls, max_mem): | |
""" | |
Set the maximum memory to keep instantiating jobs. | |
:param max_mem: String describing the maximum memory that can be used on | |
the system. It can either describe memory percentage or absolute amount. | |
Use 'P%' for percentage or 'N' for absolute value in bytes, 'Nk' for | |
kilobytes, 'Nm' for megabytes, and 'Ng' for gigabytes. | |
:type max_mem: str | |
""" | |
cls._singleton._set_max_mem(max_mem) | |
@classmethod | |
def wait_acquire(cls): | |
""" | |
Block until a job server token is acquired, then return it. | |
""" | |
return cls._singleton._wait_acquire() | |
@classmethod | |
def try_acquire(cls): | |
""" | |
Yield None until a job server token is acquired, then yield it. | |
""" | |
while True: | |
# make sure we're observing load and memory maximums | |
if cls._singleton._load_ok() and cls._singleton._mem_ok(): | |
# try to get a job token | |
token = cls._singleton._acquire() | |
yield token | |
else: | |
yield None | |
@classmethod | |
def release(cls): | |
""" | |
Release a job server token. | |
""" | |
cls._singleton._release() | |
@classmethod | |
def gnu_make_supported(cls): | |
return cls._gnu_make_supported | |
@classmethod | |
def gnu_make_args(cls): | |
""" | |
Get required arguments for spawning child gnu Make processes. | |
""" | |
if cls._make_supported: | |
return ["--jobserver-fds=%d,%d" % cls._singleton.job_pipe, "-j"] | |
else: | |
return [] | |
@classmethod | |
def max_jobs(cls): | |
""" | |
Get the maximum number of jobs. | |
""" | |
return cls._singleton.max_jobs | |
@classmethod | |
def running_jobs(cls): | |
""" | |
Try to estimate the number of currently running jobs. | |
""" | |
if not cls._gnu_make_supported: | |
return '?' | |
try: | |
buf = array.array('i', [0]) | |
if fcntl.ioctl(cls._singleton.job_pipe[0], FIONREAD, buf) == 0: | |
return cls._singleton.max_jobs - buf[0] | |
except NotImplementedError: | |
pass | |
except OSError: | |
pass | |
return cls._singleton.max_jobs | |
class JobGuard: | |
""" | |
Context manager representing a jobserver job. | |
""" | |
def __enter__(self): | |
JobServer.wait_acquire() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
JobServer.release() | |
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Starting >>> job_a | |
Starting >>> job_b | |
Starting >>> job_e | |
Errors <<< job_a:grep | |
> grep: /etc/mtab.fuselock: Permission denied | |
> grep: /etc/fuse.conf: Permission denied | |
> grep: /etc/docker: Permission denied | |
> grep: /etc/blkid.tab: No such file or directory | |
> grep: /etc/sudoers: Permission denied | |
> grep: /etc/group-: Permission denied | |
> grep: /etc/gshadow: Permission denied | |
> grep: /etc/at.deny: Permission denied | |
> grep: /etc/shadow: Permission denied | |
> grep: /etc/passwd-: Permission denied | |
> grep: /etc/gshadow-: Permission denied | |
> grep: /etc/shadow-: Permission denied | |
Failed <<< job_a:grep [ Exited code 2 ] | |
Failed <<< job_a [ 0.0 seconds ] | |
Errors <<< job_e:foo | |
> Can't foo! | |
> Traceback (most recent call last): | |
> File "./asynctest.py", line 307, in async_job | |
> retcode = yield asyncio.From(get_loop().run_in_executor(executor, stage.function, logger, event_queue)) | |
> RuntimeError | |
> | |
Failed <<< job_e:foo [ Exited code 1 ] | |
Failed <<< job_e [ 1.0 seconds ] | |
Abandoned <<< job_f [ Depends on failed job job_e ] | |
Abandoned <<< job_i [ Depends on failed job job_e via job_f ] | |
Finished <<< job_b [ 2.0 seconds ] | |
Starting >>> job_c | |
Starting >>> job_h | |
Warnings <<< job_h:baz | |
> You shouldn't be bazing! | |
> You should be baring instead. | |
Finished <<< job_h [ 1.0 seconds ] | |
Finished <<< job_c [ 2.0 seconds ] | |
Starting >>> job_d1 | |
Starting >>> job_d2 | |
Starting >>> job_d3 | |
Starting >>> job_g | |
Finished <<< job_g [ 1.0 seconds ] | |
Finished <<< job_d1 [ 1.1 seconds ] | |
Finished <<< job_d2 [ 1.2 seconds ] | |
Finished <<< job_d3 [ 1.3 seconds ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
From this version