Skip to content

Instantly share code, notes, and snippets.

@dansondergaard
Created August 30, 2016 11:54
Show Gist options
  • Select an option

  • Save dansondergaard/a49aab555abd4809fbcf2a290ef206a7 to your computer and use it in GitHub Desktop.

Select an option

Save dansondergaard/a49aab555abd4809fbcf2a290ef206a7 to your computer and use it in GitHub Desktop.
class Backend(metaclass=BackendType):
"""Representation of a backend.
A backend is initialized with an instance of
:class:`gwf.core.PreparedWorkflow`.
"""
def __init__(self, workflow):
if not isinstance(workflow, PreparedWorkflow):
raise WorkflowNotPreparedError()
self.workflow = workflow
def submitted(self, targets):
"""Return whether the target has been submitted."""
raise NotImplementedError()
def running(self, targets):
"""Return whether the target is running."""
raise NotImplementedError()
def submit(self, targets):
"""Submit a target."""
raise NotImplementedError()
def cancel(self, targets):
"""Cancel a target."""
raise NotImplementedError()
def _get_schedule_for_target(self, root):
"""Linearize the targets to be run."""
# If the target is already in the queue we just dismiss the scheduling
# right away... this because we need to handle dependent nodes in the
# queue differently, since for those we need to wait for completion.
if self.submitted(root):
return [], set()
schedule = [
target
for target in dfs(root, self.workflow.dependencies)
if self.submitted(target) or self.workflow.should_run(target)
]
return schedule, set(schedule)
def schedule(self, targets):
"""Schedule and submit a list of :class:`Target`s."""
for target in targets:
schedule, scheduled_targets = self._get_schedule_for_target(target)
if not schedule:
continue
logger.info('Scheduling target %s.', target.name)
for scheduled_target in schedule:
if self.submitted(scheduled_target):
logger.info("Target %s is already submitted")
continue
logger.info("Submitting target %s", scheduled_target.name)
dependents = [
dependent
for dependent in scheduled_target.depends_on
if dependent in scheduled_targets
]
self.submit(dependents)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment