2

I'm using python pipeline framework luigi and scikit-learn for the machine learning batch jobs especially in MiniBatchDictionaryLearning module. But it doesn't work as I expected when I execute with multiple process. My code is like this.(It's just a example though.)

import luigi
import numpy as np

class First(luigi.Task):

    job_nums = 2

    def requires(self):
        return [Second(job_nums=n) for n in range(self.job_nums)]

    def output(self):
        return luigi.LocalTarget("end.txt")

    def run(self):
        with self.output().open("w") as out_:
            pass


class Second(luigi.Task):
    data = np.arange(288000)
    job_nums = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget("./dict{0}.npy".format(self.job_nums))

    def run(self):
        from sklearn.decomposition import MiniBatchDictionaryLearning
        dico = MiniBatchDictionaryLearning(n_components=144, n_jobs=1)
        D = dico.fit(np.reshape(self.data, (1000, 288))).components_
        print D
        with self.output().open("w") as out_:
            D.dump(out_)

When I execute this code with single process, it works.

$ PYTHONPATH="" luigi --module this_is_a_test First

DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete
~~ snip ~~

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 First()
    - 2 Secound(job_nums=0,1)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

But when I execute with multiple process, it doesn't work and I got this error.

$ PYTHONPATH="" luigi --module this_is_a_test First --workers 2
DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete

~~ snip ~~

DEBUG: 2 running tasks, waiting for next task to finish
INFO: [pid 18109] Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) running   Secound(job_nums=0)
DEBUG: 2 running tasks, waiting for next task to finish
DEBUG: 2 running tasks, waiting for next task to finish
INFO: Worker task Secound(job_nums=1) died unexpectedly with exit code -11
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task   Secound(job_nums=1)   has status   FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Secound(job_nums=0) is currently run by worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072)
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task   Secound(job_nums=0)   has status   FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: There are 1 pending tasks unique to this worker
INFO: Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 2 failed:
    - 2 Secound(job_nums=0,1)
* 1 were left pending, among these:
    * 1 had failed dependencies:
        - 1 First()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

I checked stack trace and found it out something goes wrong in the scikit-learn fit method but I couldn't find out exactly what cause is.

Could you tell me how can I fix this problem?

mhigu
  • 23
  • 7
  • http://stackoverflow.com/questions/32978429/python-luigi-died-unexpectedly-with-exit-code-11 Here's a similar problem, but with a different ML library. Wish I knew how to fix it. Could be something to do with Process ID assignment when scikit learn spawns new instances on each thread? It may be that it fails when it tries to do that in parallel with multiple workers. – Charlie Haley Dec 21 '15 at 20:08
  • It seems like very similar problem. In my case, the processing stops in the "extmath.py/ def safe_sparse_dot(a, b)" of scikit learn. But I couldn't figure out what is exactly happening... – mhigu Dec 22 '15 at 02:46
  • I have dealt with this problem using multiprocessing module in python, calling luigi task directly from invoked process. It definitely works but I can't check how much task remain from luigi user interface and it's a little bit inconvenient. I wish someone find out resolution for this problem... – mhigu Dec 25 '15 at 06:38

0 Answers0