15

I have a luigi task that performs some non-stable computations. Think of an optimization process that sometimes does not converge.

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)

Now I would like to build a wrapper task that will run this optimizer several times, with different input parameters, and will take the output of the first run that converged.

The way I am implementing it now is by not using MyOptimizer because if it fails, luigi will think that also the wrapper task has failed, but I am okay with some instances of MyOptimizer failing.

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)

The problem is that this way, the tasks are not parallelized. Also, you can imagine MyOptimizer and optimize_something are complex tasks that also participate in the data-pipeline handled by luigi, which creates pretty much chaos in my code.

I would appreciate any insights and ideas on how to make this work in a luigi-like fashion :)

DalyaG
  • 2,979
  • 2
  • 16
  • 19

1 Answers1

1

Can you make is so that your Optimizer always writes something out? Even if it's an empty file to signify failure but which will look successful to luigi? Also, use the input_param in the MyOptimizer's output filename to make the filenames unique.

Then:

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        task_list = [MyOptimizer(input_param) for input_param in self.input_params_list]
        targets = yield task_list #executes tasks in parallel 
        for target in targets:
             ...do something to read and compare outputs
             some_data = some_read(target.path)
        ...write optimal solution

    def output(self):
        return luigi.LocalTarget(self.output_filename)
hotplasma
  • 68
  • 4