2

I have two consecutive functions that process large lists.

I call one after another, using joblib's Parallel, delayed, in an attempt to increase a processing speed for both functions individually.

However, I am seeing an output from function_1 also as soon as Parallel calls function_2 and I don't understand why. In a nutshell, this leads to a function_2 not being called.

The main code:

from mycode import function_2
from joblib import Parallel, delayed
import gc

if __name__ == '__main__':  
   list = list_1
   print ">>> First call"
   Parallel(n_jobs = -1)(delayed(function_1) 
                                         (item) for item in list)
   gc.collect()
   do_other_stuff()
   list = list_2
   print ">>> Second call"
   Parallel(n_jobs=-1, backend='threading')(delayed(function_2)
                                         (item) for item in list)

Threaded functions:

def function_1(): # Gets called first
    print "this comes from function 1"
    pass

def function_2(): # Gets called second
    print "this comes from function 2"
    pass

Output:

>>> First call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1
>>> Second call
this comes from function 1
this comes from function 1
this comes from function 1
this comes from function 1

My hypothesis is that there are some parts of function_1 stored in a memory, which is retained after calling it ( possibly due to a joblib memory mapping / sharing feature? ).

This is why I gc.collect() between the calls. Since this doesn't help, I think about reloading modules between calls ( joblib, Parallel, delayed ), which seems ugly.

Did anyone experience similar behavior (on windows)?

Is there some fix?

Do I need to un/reload joblib or mycode modules here, between Parallel steps and if so, why?

user3666197
  • 1
  • 6
  • 50
  • 92
sudonym
  • 3,788
  • 4
  • 36
  • 61
  • 3
    How long is `list_1`/`list_2`? (also, side-note: Never name-shadow the `list` constructor, it will bite you eventually). Need to know how many outputs you were expecting in the first place. For the record, your explanation is almost certainly wrong unless `joblib` is the world's most amateur library ever. – ShadowRanger Feb 07 '18 at 14:02
  • List_1 is about 1.000, list_2 is ~10.000. In the real code, the list constructors are named different. Also, instead of the pass, there is output of both functions stored to disc as CSV (not returned) – sudonym Feb 07 '18 at 14:10
  • Are you sure you never get output from the second function? Looks like in the process of minimizing your example, you forgot the complete and verifiable parts of a [MCVE]. – ShadowRanger Feb 07 '18 at 15:57

2 Answers2

2

Short version:

Q1: Did anyone experience similar behavior (on windows)?
A1: No.

Q2: Is there some fix?
A2: No, ref. A1.

Q3: Do I need to un/reload joblib or mycode modules here ...?
A3: No, ref. A1.

Q4: if so ( Q3 ), why?
A4: N/A, ref. A3.


Let'drive our efforts based on a common MCVE formulation:

A slightly modified structure of the experiment may look like this:

#ass;                         import function_2
from sklearn.externals.joblib import Parallel, delayed
#ass;                         import gc
pass;                         import itertools

def function_1( aParam = "" ):                                          # Gets called first
    try:
         print "this comes from a call: function_1( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

def function_2( aParam = "" ):                                          # Gets called second
    try:
         print "this comes from a call: function_2( aParam == {0:})".format( aParam )

    except:
         pass # die in silence

    finally:
         return aParam

if __name__ == '__main__':
   print "-------------------------------------------------------------- vvv main.START()"
   #ist = list_1
   aList = [ 11, 12, 13, 14, 15, ]
   print "-------------------------------------------------------------- >>> First call"
   A = Parallel(                                               n_jobs = -1
                 )( delayed( function_1 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( A ) )
   #c.collect()
   #o_other_stuff()
   #ist = list_2
   aList = [ 21, 22, 23, 24, 25, ]
   print "-------------------------------------------------------------- >>> Second call"
   B = Parallel(                                               n_jobs  = -1,
                                                               backend = 'threading'
                 )( delayed( function_2 ) ( item ) for item in aList
                    )
   print "-------------------------------------------------------------- vvv main was ret'd: {0:}".format( repr( B ) )

Result:

C:\Python27.anaconda>python TEST_SO_Parallel.py
-------------------------------------------------------------- vvv main.START()
-------------------------------------------------------------- >>> First call
this comes from a call: function_1( aParam == 11)
this comes from a call: function_1( aParam == 12)
this comes from a call: function_1( aParam == 13)
this comes from a call: function_1( aParam == 14)
this comes from a call: function_1( aParam == 15)
-------------------------------------------------------------- vvv main was ret'd: [11, 12, 13, 14, 15]
-------------------------------------------------------------- >>> Second call
this comes from a call: function_2( aParam == 21)
this comes from a call: function_2( aParam == 22)
this comes from a call: function_2( aParam == 23)
 this comes from a call: function_2( aParam == 25)this comes from a call: function_2( aParam == 24)

-------------------------------------------------------------- vvv main was ret'd: [21, 22, 23, 24, 25]

Assessment:

As observed, a [win] py2.7 has processed the code without any above reported obstacles.

The joblib-documented processing was correct as per specification.

The above reported behaviour was not replicated, the less could be mapped to any form of a joblib-participation on the cause-effect chain.

user3666197
  • 1
  • 6
  • 50
  • 92
1

I had the same issue.

My code looked like:

A = Parallel(n_jobs=1)(delayed(self.function_1)( df_1, item ) for item in list_of_items)

B = Parallel(n_jobs=1)(delayed(self.function_2)( df_2, item ) for item in list_of_items)

where "list_of_items" variable had 2 items.

But the output was...

[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:   32.2s finished
[Parallel(n_jobs=1)]: Done   0 out of   0 | elapsed:    0.0s finished

The reason why the second Parallel process didn't run (at least in my case) was because my "list_of_items" was a generator instead of a list!

I hope this solves your problem too.. :)