I'm using python pipeline framework luigi and scikit-learn for the machine learning batch jobs especially in MiniBatchDictionaryLearning module. But it doesn't work as I expected when I execute with multiple process. My code is like this.(It's just a example though.)
import luigi
import numpy as np
class First(luigi.Task):
job_nums = 2
def requires(self):
return [Second(job_nums=n) for n in range(self.job_nums)]
def output(self):
return luigi.LocalTarget("end.txt")
def run(self):
with self.output().open("w") as out_:
pass
class Second(luigi.Task):
data = np.arange(288000)
job_nums = luigi.IntParameter()
def output(self):
return luigi.LocalTarget("./dict{0}.npy".format(self.job_nums))
def run(self):
from sklearn.decomposition import MiniBatchDictionaryLearning
dico = MiniBatchDictionaryLearning(n_components=144, n_jobs=1)
D = dico.fit(np.reshape(self.data, (1000, 288))).components_
print D
with self.output().open("w") as out_:
D.dump(out_)
When I execute this code with single process, it works.
$ PYTHONPATH="" luigi --module this_is_a_test First
DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete
~~ snip ~~
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 First()
- 2 Secound(job_nums=0,1)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
But when I execute with multiple process, it doesn't work and I got this error.
$ PYTHONPATH="" luigi --module this_is_a_test First --workers 2
DEBUG: Checking if First() is complete
DEBUG: Checking if Secound(job_nums=0) is complete
DEBUG: Checking if Secound(job_nums=1) is complete
~~ snip ~~
DEBUG: 2 running tasks, waiting for next task to finish
INFO: [pid 18109] Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) running Secound(job_nums=0)
DEBUG: 2 running tasks, waiting for next task to finish
DEBUG: 2 running tasks, waiting for next task to finish
INFO: Worker task Secound(job_nums=1) died unexpectedly with exit code -11
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task Secound(job_nums=1) has status FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Secound(job_nums=0) is currently run by worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072)
INFO: Worker task Secound(job_nums=0) died unexpectedly with exit code -11
INFO: Informed scheduler that task Secound(job_nums=0) has status FAILED
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: There are 1 pending tasks unique to this worker
INFO: Worker Worker(salt=166214281, workers=2, host=mhigu.local, username=mhigu, pid=18072) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 2 failed:
- 2 Secound(job_nums=0,1)
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 First()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
I checked stack trace and found it out something goes wrong in the scikit-learn fit method but I couldn't find out exactly what cause is.
Could you tell me how can I fix this problem?