In this simple case you can simply modify the preprocess
function to accept a threshold
attribute. Something like:
def preprocess(data, threshold):
def predicate(x):
return x < threshold
return old_preprocess(data, predicate)
Now in your preds
list you can simply put the integers, which are picklable:
preds = [1,2,3,42]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))
You can extend it to choose the operator by using the operator
module:
def preprocess(data, pred):
threshold, op = pred
def predicate(x):
return op(x, threshold)
return old_preprocess(data, predicate)
import operator as op
preds = [(1, op.lt), (2, op.gt), (3, op.ge), (42, op.lt)]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))
To extend it with arbitrary predicates things get harder. Probably the easiest way to do this is to use the marshal
module, which is able to convert the code of a function into a bytes
object and back.
Something like:
real_preds = [marshal.dumps(pred.__code__) for pred in preds]
And then the preprocess
should re-build the predicate functions:
import types
def preprocess(data, pred):
pred = types.FunctionType(marshal.loads(pred), globals())
Here's a MWE for this last suggestion:
>>> from multiprocessing import Pool
>>> import marshal
>>> import types
>>> def preprocess(pred):
... pred = types.FunctionType(marshal.loads(pred), globals())
... return pred(2)
...
>>> preds = [lambda x: x < 1,
... lambda x: x <2,
... lambda x: x < 3,
... lambda x: x < 42]
>>> real_preds = [marshal.dumps(pred.__code__) for pred in preds]
>>> pool = Pool(processes=4)
>>> pool.map(preprocess, real_preds)
[False, False, True, True]
Note that the argument to pool.map
must be picklable. Which means you cannot use a lambda
as first argument to Pool.map
:
>>> pool.map(lambda x: preprocess(x), real_preds)
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python3.3/threading.py", line 639, in _bootstrap_inner
self.run()
File "/usr/lib/python3.3/threading.py", line 596, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.3/multiprocessing/pool.py", line 351, in _handle_tasks
put(task)
File "/usr/lib/python3.3/multiprocessing/connection.py", line 206, in send
ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
Regarding the "is Pool.map
the right tool? I believe it highly depends on the size of the data. Using multiprocessing increases quite a lot the overhead, so even if you "make it work" there are high chances that it isn't worth it. In particular, in your edited question you put a more "real world" scenario for predicates:
lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower()))
I believe that this predicate doesn't take enough time to make it worth using Pool.map
. Obviously it depends on the size of w
and the number of elements to map.
Doing really fast tests with this predicate I see that using Pool.map
starts to become faster when w
is around 35000 characters in length. If w
is less than 1000 then using Pool
is about 15 times slower than a plain map
(with 256 strings to check. If the strings are 60000 then Pool
is a bit faster).
Notice that if w
is quite long then it is worth using a def
instead of lambda
and avoid the double calculation of w.lower()
. Either if you are going to use plain map
or if you want to use Pool.map
.