In Luigi, I have a task where I want to dynamically generate a list of dependencies based on the output from another upstream task. For example:
class TaskA:
param = IntParameter()
class TaskB:
def main(self):
pass
def output(self):
return [1,2,3,4]
class TaskC:
def requires(self):
return [TaskB()] + [TaskA(param=p) for p in TaskB().output()]
In summary, I am creating a set of TaskA dependencies in TaskC, based on the output from TaskB.
I have tried a few things, but it seems that Luigi gets confused because TaskB really needs to run before TaskC can return its list of dependencies. But obviously Luigi cannot run anything until it calls TaskC.requires()
Is there any way to make this work and accomplish what I am trying to do here?
In my real-life scenario, the implementations of these tasks are much more complex, but this is the gist of how the tasks are connected.
This is a great question! And Luigi actually provides the perfect solution for it and is covered here in the docs: https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies
Basically, you will require TaskB
and then yield
to new tasks in the run
function based on the output. Let me show you in an example:
class TaskC:
def requires(self):
return TaskB()
def run(self):
yield [TaskA(param=p) for p in self.input()]