If you can use Python 3.7 (or earlier) on AWS Lambda, you should be ok as that doesn't use SemLock.
But if you just need async_results
on AWS Lambda (without any additional requirements) with later versions of Python, here is an updated drop-in replacement (based on https://code.activestate.com/recipes/576519-thread-pool-with-same-api-as-multiprocessingpool/ ) :
import sys
import threading
from queue import Empty, Queue
SENTINEL = "QUIT"
def is_sentinel(obj):
"""
Predicate to determine whether an item from the queue is the
signal to stop
"""
return type(obj) is str and obj == SENTINEL
class TimeoutError(Exception):
"""
Raised when a result is not available within the given timeout
"""
class Pool(object):
def __init__(self, processes, name="Pool"):
self.processes = processes
self._queue = Queue()
self._closed = False
self._workers = []
for idx in range(processes):
thread = PoolWorker(self._queue, name="Worker-%s-%d" % (name, idx))
try:
thread.start()
except Exception:
# If one thread has a problem, undo everything
self.terminate()
raise
else:
self._workers.append(thread)
def apply_async(self, func, args, kwds):
apply_result = ApplyResult()
job = Job(func, args, kwds, apply_result)
self._queue.put(job)
return apply_result
def close(self):
self._closed = True
def join(self):
"""
This is only called when all are done.
"""
self.terminate()
def terminate(self):
"""
Stops the worker processes immediately without completing
outstanding work. When the pool object is garbage collected
terminate() will be called immediately.
"""
self.close()
# Clearing the job queue
try:
while True:
self._queue.get_nowait()
except Empty:
pass
for thread in self._workers:
self._queue.put(SENTINEL)
class PoolWorker(threading.Thread):
"""
Thread that consumes WorkUnits from a queue to process them
"""
def __init__(self, queue, *args, **kwds):
"""
Args:
queue: the queue of jobs
"""
threading.Thread.__init__(self, *args, **kwds)
self.daemon = True
self._queue = queue
def run(self):
"""
Process the job, or wait for sentinel to exit
"""
while True:
job = self._queue.get()
if is_sentinel(job):
# Got sentinel
break
job.process()
class ApplyResult(object):
"""
Container to hold results.
"""
def __init__(self):
self._data = None
self._success = None
self._event = threading.Event()
def ready(self):
is_ready = self._event.isSet()
return is_ready
def get(self, timeout=None):
"""
Returns the result when it arrives. If timeout is not None and
the result does not arrive within timeout seconds then
TimeoutError is raised. If the remote call raised an exception
then that exception will be reraised by get().
"""
if not self.wait(timeout):
raise TimeoutError("Result not available within %fs" % timeout)
if self._success:
return self._data
raise self._data[0](self._data[1], self._data[2])
def wait(self, timeout=None):
"""
Waits until the result is available or until timeout
seconds pass.
"""
self._event.wait(timeout)
return self._event.isSet()
def _set_exception(self):
self._data = sys.exc_info()
self._success = False
self._event.set()
def _set_value(self, value):
self._data = value
self._success = True
self._event.set()
class Job(object):
"""
A work unit that corresponds to the execution of a single function
"""
def __init__(self, func, args, kwds, apply_result):
"""
Args:
func: function
args: function args
kwds: function kwargs
apply_result: ApplyResult object that holds the result
of the function call
"""
self._func = func
self._args = args
self._kwds = kwds
self._result = apply_result
def process(self):
"""
Call the function with the args/kwds and tell the ApplyResult
that its result is ready. Correctly handles the exceptions
happening during the execution of the function
"""
try:
result = self._func(*self._args, **self._kwds)
except Exception:
self._result._set_exception()
else:
self._result._set_value(result)