-
-
Save inactivist/56e76ac21975e97a80b9b04de725d0a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict, OrderedDict | |
import luigi | |
from luigi.task import flatten, getpaths | |
def topological_sorting(struct, outnodes_funct, transform_funct): | |
struct = flatten(struct.keys()) if isinstance(struct, dict) else flatten(struct) | |
visited = OrderedDict() | |
def dvisit(root): | |
if root in visited.keys(): | |
return | |
outnodes = flatten(outnodes_funct(root)) | |
for o in outnodes: | |
dvisit(o) | |
visited.update({root: transform_funct(root)}) | |
for root in struct: | |
dvisit(root) | |
return OrderedDict(reversed(visited.items())) | |
def to_dag(struct, outnodes_funct): | |
inv_dag = defaultdict(list) | |
def inv_visit_function(root): | |
outnodes = flatten(outnodes_funct(root)) | |
for o in outnodes: | |
inv_dag[o].append(root) | |
return outnodes | |
dag = topological_sorting(struct, | |
outnodes_funct, | |
inv_visit_function) | |
return dag, inv_dag | |
def clear_task_output(task): | |
for output in flatten(task.output()): | |
# This works for LocalTargetOutput | |
# Add here your per class notion of 'clear' | |
if output.exists(): | |
output.remove() | |
def clear_task_dag_output(struct, dag): | |
def outnodes_funct(root): | |
return dag[root] | |
for root in flatten(struct): | |
topological_sorting(root, outnodes_funct, clear_task_output) | |
def task_outnodes_funct(task): | |
return flatten(task.requires()) | |
class DAG(object): | |
def __init__(self, lasttask): | |
# lasttask(s) should be the last task to be executed (no task depends on it) | |
self.struct = lasttask | |
self._build() | |
def _build(self): | |
self.dag, self.inv_dag = to_dag(self.struct, task_outnodes_funct) | |
def clean_backward(self, tasks): | |
# Clean (recursively) all dependencies of tasks | |
return self._clean(tasks, direction='backward') | |
def clean_forward(self, tasks): | |
# Clean (recursively) all tasks that depend on those | |
return self._clean(tasks, direction='forward') | |
def clean_all(self, tasks): | |
return self._clean(tasks, direction='all') | |
def _clean(self, tasks, direction=None): | |
if direction in ['all', 'backward']: | |
clear_task_dag_output(tasks, self.dag) | |
if direction in ['all', 'forward']: | |
clear_task_dag_output(tasks, self.inv_dag) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment