4

Setup: I have a function preprocess(data, predicate) and a list of predicates that might look like this:

preds = [lambda x: x < 1,
         lambda x: x < 2,
         lambda x: x < 3,
         lambda x: x < 42]

EDIT: I probably should have been more precise, because I thought 1, 2, 3, 42 are obviously identifiable as examples, but it seems that it was too implicit. Actually I'm doing some NLP and data are lists of words and one predicate looks like lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower())). I want to test different predicates to evaluate which performes best.

Here is what I actually want to do. Call preprocess with every predicate, in parallel.

EDIT: Because this is a preprocessing step I need what is beeing returned by preprocess to continue to work with it.

What I hoped I can do but sadly can't:

pool = Pool(processes=4)
pool.map(lambda p: preprocess(data, p), preds)

As far as I understood this is because everything passed to pool.map has to be pickle-able. In this question there are two solutions suggested of which the first(accepted answer) seems impractical and the secound doesn't seem to work in Python 2.7, which I'm using, even though suggested that it does by pythonic metaphor in the comments.

My Question is whether or not pool.map is the right way to go and if so how to do it? Or shoud I try a different approach?

I know there are quite a lot of questions regarding pool.map and even though I spent some time searching I didn't found an answer. Also if my code-style is awkward feel free to point out. I read that lambda looks strange to some and that I probably should use functools.partial.

Thanks in advance.

Community
  • 1
  • 1
MLmuchAmaze
  • 379
  • 2
  • 14
  • `lambda`s are fine and are *more general* than `partial`. In your case an equivalent form using `partial` would be something like `partial(operator.ge, constant)` which is much *less* readable than `lambda x: x < constant` (note also the change of operator due to the fact that `partial` has to "fill in" the arguments in order). – Bakuriu Jan 13 '14 at 18:43
  • [Multiprocessing with lambdas](https://medium.com/@yasufumy/python-multiprocessing-c6d54107dd55) – Petr Vepřek Dec 30 '20 at 18:01

3 Answers3

3

In this simple case you can simply modify the preprocess function to accept a threshold attribute. Something like:

def preprocess(data, threshold):
    def predicate(x):
        return x < threshold
    return old_preprocess(data, predicate)

Now in your preds list you can simply put the integers, which are picklable:

preds = [1,2,3,42]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

You can extend it to choose the operator by using the operator module:

def preprocess(data, pred):
    threshold, op = pred
    def predicate(x):
        return op(x, threshold)
    return old_preprocess(data, predicate)

import operator as op
preds = [(1, op.lt), (2, op.gt), (3, op.ge), (42, op.lt)]
pool = Pool(processes=4)
pool.map(preprocess, zip(data, preds))

To extend it with arbitrary predicates things get harder. Probably the easiest way to do this is to use the marshal module, which is able to convert the code of a function into a bytes object and back.

Something like:

real_preds = [marshal.dumps(pred.__code__) for pred in preds]

And then the preprocess should re-build the predicate functions:

import types

def preprocess(data, pred):
    pred = types.FunctionType(marshal.loads(pred), globals())

Here's a MWE for this last suggestion:

>>> from multiprocessing import Pool
>>> import marshal
>>> import types
>>> def preprocess(pred):
...     pred = types.FunctionType(marshal.loads(pred), globals())
...     return pred(2)
... 
>>> preds = [lambda x: x < 1,
...          lambda x: x <2,
...          lambda x: x < 3,
...          lambda x: x < 42]
>>> real_preds = [marshal.dumps(pred.__code__) for pred in preds]
>>> pool = Pool(processes=4)
>>> pool.map(preprocess, real_preds)
[False, False, True, True]

Note that the argument to pool.map must be picklable. Which means you cannot use a lambda as first argument to Pool.map:

>>> pool.map(lambda x: preprocess(x), real_preds)
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.3/threading.py", line 639, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.3/threading.py", line 596, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.3/multiprocessing/pool.py", line 351, in _handle_tasks
    put(task)
  File "/usr/lib/python3.3/multiprocessing/connection.py", line 206, in send
    ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

Regarding the "is Pool.map the right tool? I believe it highly depends on the size of the data. Using multiprocessing increases quite a lot the overhead, so even if you "make it work" there are high chances that it isn't worth it. In particular, in your edited question you put a more "real world" scenario for predicates:

lambda w: (w.lower() not in stopwords.words('english') and re.search("[a-z]", w.lower()))

I believe that this predicate doesn't take enough time to make it worth using Pool.map. Obviously it depends on the size of w and the number of elements to map.

Doing really fast tests with this predicate I see that using Pool.map starts to become faster when w is around 35000 characters in length. If w is less than 1000 then using Pool is about 15 times slower than a plain map (with 256 strings to check. If the strings are 60000 then Pool is a bit faster).

Notice that if w is quite long then it is worth using a def instead of lambda and avoid the double calculation of w.lower(). Either if you are going to use plain map or if you want to use Pool.map.

Bakuriu
  • 98,325
  • 22
  • 197
  • 231
2

You can do this with Pool.map, you just have to organize what you're mapping properly. Maps basically work like this:

result = map(function, things)

is equivalent to

result = []
for thing in things:
    result.append(function(thing))

or, more concisely,

result = [function(thing) for thing in things]

You can structure your function so that it accepts an argument (the upper bound) and does the comparison itself:

def mapme(bound):
    p = lambda x : x < bound
    return preprocess(data, p)

From there, it doesn't matter if you're doing a parallel map or a single threaded one. As long as preprocess doesn't have side effects, you can use a map.

nmichaels
  • 49,466
  • 12
  • 107
  • 135
  • I can only do this if data is globally defined which is not the case. Am I right? – MLmuchAmaze Jan 14 '14 at 16:01
  • I assumed from your question that it was, but you can put the `mapme` definition in the scope where data is defined, or throw extra parameters in where appropriate. – nmichaels Jan 14 '14 at 17:05
0

If you're using the functions for their side effects and don't need to use the unified output of pool.map(), you can just simulate it using os.fork() (at least on unix-like systems).

You could try something like this:

import numpy as np
import os
nprocs=4
funcs=np.array_split(np.array(preds),nprocs)
#Forks the program into nprocs programs, each with a procid from 0 to nprocs-1
procid=0
for x in range(1,nprocs):
    if (os.fork()==0):
        procid=x
        break
map(lambda p: preprocess(data, p), funcs[procid])
Dan
  • 12,157
  • 12
  • 50
  • 84
  • No, I need the results, since it's a preprocess step, I want to continue to precess them. – MLmuchAmaze Jan 13 '14 at 18:53
  • @Kai Steinert: You could continue to process them separately, it's just difficult to combine the results later. – Dan Jan 13 '14 at 22:03