34

I have a short code that uses the multiprocessing package and works fine on my local machine.

When I uploaded to AWS Lambda and run there, I got the following error (stacktrace trimmed):

[Errno 38] Function not implemented: OSError
Traceback (most recent call last):
  File "/var/task/recorder.py", line 41, in record
    pool = multiprocessing.Pool(10)
  File "/usr/lib64/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 138, in __init__
    self._setup_queues()
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 234, in _setup_queues
    self._inqueue = SimpleQueue()
  File "/usr/lib64/python2.7/multiprocessing/queues.py", line 354, in __init__
    self._rlock = Lock()
  File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 147, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1)
  File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 75, in __init__
    sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 38] Function not implemented

Can it be that a part of python's core packages in not implemented? I have no idea what am I running on underneath so I can't login there and debug.

Any ideas how can I run multiprocessing on Lambda?

Zach Moshe
  • 2,782
  • 4
  • 24
  • 40
  • Possible duplicate of [OSError 38 \[Errno 38\] with multiprocessing](https://stackoverflow.com/questions/6033599/oserror-38-errno-38-with-multiprocessing) – 200_success Jun 12 '18 at 21:11

5 Answers5

18

As far as I can tell, multiprocessing won't work on AWS Lambda because the execution environment/container is missing /dev/shm - see https://forums.aws.amazon.com/thread.jspa?threadID=219962 (login may be required).

No word (that I can find) on if/when Amazon will change this. I also looked at other libraries e.g. https://pythonhosted.org/joblib/parallel.html will fallback to /tmp (which we know DOES exist) if it can't find /dev/shm, but that doesn't actually solve the problem.

Alex R
  • 11,364
  • 15
  • 100
  • 180
glennji
  • 349
  • 1
  • 4
  • 4
    Can you elaborate on how to solve this problem with joblib? I am testing it out right now, and joblib is failing back to serial operations: `[Errno 38] Function not implemented. joblib will operate in serial mode` – pjgranahan Mar 17 '17 at 07:19
  • 4
    [This thread](https://github.com/joblib/joblib/issues/391) seems to suggest that joblib can not actually work around this issue. – pjgranahan Mar 17 '17 at 07:47
  • 1
    Yeah sorry, I never dug deep enough into this. Could well be unworkable. – glennji Mar 21 '17 at 02:56
  • 4
    Please update your answer. It looks misleading until a visitor has to read comments. – Mohamed Taher Alrefaie Jul 30 '18 at 16:56
11

You can run routines in parallel on AWS Lambda using Python's multiprocessing module but you can't use Pools or Queues as noted in other answers. A workable solution is to use Process and Pipe as outlined in this article https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

While the article definitely helped me get to a solution (shared below) there are a few things to be aware of. First, the Process and Pipe based solution is not as fast as the built in map function in Pool, though I did see nearly linear speed-ups as I increased the available memory/CPU resources in my Lambda function. Second there is a fair bit of management that has to be undertaken when developing multiprocessing functions in this way. I suspect this is at least partly why my solution is slower than the built in methods. If anyone has suggestions to speed it up I'd love to hear them! Finally, while the article notes that multiprocessing is useful for offloading asynchronous processes, there are other reasons to use multiprocessing such as lots of intensive math ops which is what I was trying to do. In the end I was happy enough with the performance improvement as it was much better than sequential execution!

The code:

# Python 3.6
from multiprocessing import Pipe, Process

def myWorkFunc(data, connection):
    result = None

    # Do some work and store it in result

    if result:
        connection.send([result])
    else:
        connection.send([None])


def myPipedMultiProcessFunc():

    # Get number of available logical cores
    plimit = multiprocessing.cpu_count()

    # Setup management variables
    results = []
    parent_conns = []
    processes = []
    pcount = 0
    pactive = []
    i = 0

    for data in iterable:
        # Create the pipe for parent-child process communication
        parent_conn, child_conn = Pipe()
        # create the process, pass data to be operated on and connection
        process = Process(target=myWorkFunc, args=(data, child_conn,))
        parent_conns.append(parent_conn)
        process.start()
        pcount += 1

        if pcount == plimit: # There is not currently room for another process
            # Wait until there are results in the Pipes
            finishedConns = multiprocessing.connection.wait(parent_conns)
            # Collect the results and remove the connection as processing
            # the connection again will lead to errors
            for conn in finishedConns:
                results.append(conn.recv()[0])
                parent_conns.remove(conn)
                # Decrement pcount so we can add a new process
                pcount -= 1

    # Ensure all remaining active processes have their results collected
    for conn in parent_conns:
        results.append(conn.recv()[0])
        conn.close()

    # Process results as needed
E Morrow
  • 266
  • 3
  • 5
  • This code is a little hard to follow. Is `myPipedMultiProcessFunc` a viable replacement for Pool.map()? – Alex R May 26 '19 at 16:49
  • As written `myPipedMultiProcessFunc` is much faster than running `myWorkFunc` in a sequential loop. It has been awhile since I wrote this, but my recollection is that this implementation is about 80% of the speed of `Pool.map()`. Happy to follow up if there are particular bits of my code that are unclear. – E Morrow May 27 '19 at 21:16
7

I have bumped into the same issue. This is the code I had previously that worked just fine in my local machine:

import concurrent.futures


class Concurrent:

    @staticmethod
    def execute_concurrently(function, kwargs_list):
        results = []
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for _, result in zip(kwargs_list, executor.map(function, kwargs_list)):
                results.append(result)
        return results

And I replaced it with this one:

import concurrent.futures


class Concurrent:

    @staticmethod
    def execute_concurrently(function, kwargs_list):
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(function, kwargs) for kwargs in kwargs_list]
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
        return results

Works like a charm.

Taken from this pull request

  • Note that this changes it to use multi-threading instead of multi-processing. It will run, but depending what each function execution is doing might not be as performant as using multi-processing might be. – afaulconbridge Oct 25 '21 at 13:31
2

multiprocessing.Pool and multiprocessing.Queue are not supported natively (because of the issue with SemLock), but multiprocessing.Process and multiprocessing.Pipe etc work properly in AWSLambda.

That should allow you to build a workaround solution by manually creating/forking processes and using a multiprocessing.Pipe for communication between the parent and child processes. Hope that helps

Pikamander2
  • 7,332
  • 3
  • 48
  • 69
whummer
  • 487
  • 3
  • 4
1

If you can use Python 3.7 (or earlier) on AWS Lambda, you should be ok as that doesn't use SemLock.

But if you just need async_results on AWS Lambda (without any additional requirements) with later versions of Python, here is an updated drop-in replacement (based on https://code.activestate.com/recipes/576519-thread-pool-with-same-api-as-multiprocessingpool/ ) :

import sys
import threading
from queue import Empty, Queue

SENTINEL = "QUIT"

def is_sentinel(obj):
    """
    Predicate to determine whether an item from the queue is the
    signal to stop
    """
    return type(obj) is str and obj == SENTINEL

class TimeoutError(Exception):
    """
    Raised when a result is not available within the given timeout
    """

class Pool(object):
    def __init__(self, processes, name="Pool"):
        self.processes = processes
        self._queue = Queue()
        self._closed = False
        self._workers = []
        for idx in range(processes):
            thread = PoolWorker(self._queue, name="Worker-%s-%d" % (name, idx))
            try:
                thread.start()
            except Exception:
                # If one thread has a problem, undo everything
                self.terminate()
                raise
            else:
                self._workers.append(thread)

    def apply_async(self, func, args, kwds):
        apply_result = ApplyResult()
        job = Job(func, args, kwds, apply_result)
        self._queue.put(job)
        return apply_result

    def close(self):
        self._closed = True

    def join(self):
        """
        This is only called when all are done.
        """
        self.terminate()

    def terminate(self):
        """
        Stops the worker processes immediately without completing
        outstanding work. When the pool object is garbage collected
        terminate() will be called immediately.
        """
        self.close()

        # Clearing the job queue
        try:
            while True:
                self._queue.get_nowait()
        except Empty:
            pass

        for thread in self._workers:
            self._queue.put(SENTINEL)


class PoolWorker(threading.Thread):
    """
    Thread that consumes WorkUnits from a queue to process them
    """
    def __init__(self, queue, *args, **kwds):
        """
        Args:
            queue: the queue of jobs
        """
        threading.Thread.__init__(self, *args, **kwds)
        self.daemon = True
        self._queue = queue

    def run(self):
        """
        Process the job, or wait for sentinel to exit
        """
        while True:
            job = self._queue.get()
            if is_sentinel(job):
                # Got sentinel
                break
            job.process()


class ApplyResult(object):
    """
    Container to hold results.
    """
    def __init__(self):
        self._data = None
        self._success = None
        self._event = threading.Event()

    def ready(self):
        is_ready = self._event.isSet()
        return is_ready

    def get(self, timeout=None):
        """
        Returns the result when it arrives. If timeout is not None and
        the result does not arrive within timeout seconds then
        TimeoutError is raised. If the remote call raised an exception
        then that exception will be reraised by get().
        """
        if not self.wait(timeout):
            raise TimeoutError("Result not available within %fs" % timeout)
        if self._success:
            return self._data
        raise self._data[0](self._data[1], self._data[2])

    def wait(self, timeout=None):
        """
        Waits until the result is available or until timeout
        seconds pass.
        """
        self._event.wait(timeout)
        return self._event.isSet()

    def _set_exception(self):
        self._data = sys.exc_info()
        self._success = False
        self._event.set()

    def _set_value(self, value):
        self._data = value
        self._success = True
        self._event.set()


class Job(object):
    """
    A work unit that corresponds to the execution of a single function
    """
    def __init__(self, func, args, kwds, apply_result):
        """
        Args:
            func: function
            args: function args
            kwds: function kwargs
            apply_result: ApplyResult object that holds the result
                of the function call
        """
        self._func = func
        self._args = args
        self._kwds = kwds
        self._result = apply_result

    def process(self):
        """
        Call the function with the args/kwds and tell the ApplyResult
        that its result is ready. Correctly handles the exceptions
        happening during the execution of the function
        """
        try:
            result = self._func(*self._args, **self._kwds)
        except Exception:
            self._result._set_exception()
        else:
            self._result._set_value(result)
Doug Blank
  • 2,031
  • 18
  • 36
  • thank you, using python3.7 is the only usable solution when the multiprocessing code is in a 3rd party package you don't have control over – Nimrod Morag Jul 01 '22 at 08:07