Created
August 30, 2016 11:54
-
-
Save dansondergaard/a49aab555abd4809fbcf2a290ef206a7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Backend(metaclass=BackendType): | |
| """Representation of a backend. | |
| A backend is initialized with an instance of | |
| :class:`gwf.core.PreparedWorkflow`. | |
| """ | |
| def __init__(self, workflow): | |
| if not isinstance(workflow, PreparedWorkflow): | |
| raise WorkflowNotPreparedError() | |
| self.workflow = workflow | |
| def submitted(self, targets): | |
| """Return whether the target has been submitted.""" | |
| raise NotImplementedError() | |
| def running(self, targets): | |
| """Return whether the target is running.""" | |
| raise NotImplementedError() | |
| def submit(self, targets): | |
| """Submit a target.""" | |
| raise NotImplementedError() | |
| def cancel(self, targets): | |
| """Cancel a target.""" | |
| raise NotImplementedError() | |
| def _get_schedule_for_target(self, root): | |
| """Linearize the targets to be run.""" | |
| # If the target is already in the queue we just dismiss the scheduling | |
| # right away... this because we need to handle dependent nodes in the | |
| # queue differently, since for those we need to wait for completion. | |
| if self.submitted(root): | |
| return [], set() | |
| schedule = [ | |
| target | |
| for target in dfs(root, self.workflow.dependencies) | |
| if self.submitted(target) or self.workflow.should_run(target) | |
| ] | |
| return schedule, set(schedule) | |
| def schedule(self, targets): | |
| """Schedule and submit a list of :class:`Target`s.""" | |
| for target in targets: | |
| schedule, scheduled_targets = self._get_schedule_for_target(target) | |
| if not schedule: | |
| continue | |
| logger.info('Scheduling target %s.', target.name) | |
| for scheduled_target in schedule: | |
| if self.submitted(scheduled_target): | |
| logger.info("Target %s is already submitted") | |
| continue | |
| logger.info("Submitting target %s", scheduled_target.name) | |
| dependents = [ | |
| dependent | |
| for dependent in scheduled_target.depends_on | |
| if dependent in scheduled_targets | |
| ] | |
| self.submit(dependents) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment