I have a requirement where DAG looks like following
-> Task C -> Task D
|
Task A -> Task B -
|
-> Task E -> Task F
Task B have a python ternary operator which decides what direction/leg to execute at runtime i.e. runtime flow will be either (Task C, Task D) or (Task E, Task F)
How can we define this under luigi task requires function ? Any idea or code snippet will be appreciated.
You can define dynamic dependency.
I think in your example you'd have a task G
that would depend on B
and based on the output of B
yield C
and D
or E
and F
import luigi
class GTask(luigi.Task):
def requires():
return BTask()
def run():
if check_b_outout(self.input()):
yield CTask()
yield DTask()
else:
yield ETask()
yield FTask()