I have a pipeline: F -> M -> S. Where F, M and S are tasks. I call luigi with task S. Task S requires M; and M requires F. But sometimes, M requires D, others times requires B. F, D an B are different, nothing alike, but the output of all these tasks can be used by M. Today we have 3 options, but with time this number will grow. Should I change all the tasks each time a new task is added to pipeline as an option, (for instance, we now have the code for task E, that sometimes will be required by M, and other times won't), or can I pass the task as a parameter (how?)?
Asked
Active
Viewed 82 times
1 Answers
0
What you want is probably Dynamic Dependency in luigi. I would have something like this
class M(luigi.Task):
def requires(self):
# keep empty() or add any dependency which are not dynamically required
def run(self):
# Collect condition for each of the dynamic dependencies
if(condition_for_task_F):
s=yield TaskF()
elif....
else:
s= yield TaskE()
And to answer your question whether can you pass Task as a parameter. The Answer is might be possible with this. But this must be absolutely prohibited since it breaks idempotence principle. This is bad programming. the better way is to decouple dependency logic to another task like this.
F,D,B,E -> Y -> M -> S
where Y just calculates the dependencies and pushes them to M.

sahil shetye
- 26
- 2