Created
March 29, 2021 09:35
-
-
Save leifdenby/88d12536f0eb0a914c81cc8b3554eae6 to your computer and use it in GitHub Desktop.
Consumer task with dynamic number of requirements
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
def pack(cls, **kwargs): | |
return dict(class_name=cls.__name__, kwargs=kwargs) | |
def unpack(class_name, kwargs): | |
TaskClass = globals()[class_name] | |
return TaskClass(**kwargs) | |
class ConsumerTask(luigi.Task): | |
source_tasks = luigi.ListParameter() | |
def requires(self): | |
return [unpack(**task_kwargs) for task_kwargs in self.source_tasks] | |
def run(self): | |
import ipdb | |
ipdb.set_trace() | |
class ProviderTask(luigi.Task): | |
name = luigi.Parameter() | |
def run(self): | |
with open(self.output().fn, "w") as fh: | |
fh.write(f"42 {self.name}") | |
def output(self): | |
fn = f"data_{self.name}.txt" | |
return luigi.LocalTarget(fn) | |
class RunAll(luigi.WrapperTask): | |
def requires(self): | |
return ConsumerTask( | |
source_tasks=[ | |
pack(ProviderTask, name="foo"), | |
pack(ProviderTask, name="bar"), | |
] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment