I have some python that creates multiple processes to complete a task much quicker. When I create these processes I pass in a queue. Inside the processes I use queue.put(data) so I am able to retrieve the data outside of the processes. It works fantastic on my local machine, but when I upload the zip to an AWS Lambda function (Python 3.8) it says the Queue() function has not been implemented.The project runs great in the AWS Lambda when I simply take out the queue functionality so I know this is the only hang up I currently have.
I ensured to install the multiprocessing package directly to my python project by using "pip install multiprocess -t ./" as well as "pip install boto3 -t ./".
I am new to python specifically as well as AWS but the research I have come across recently potentially points we to SQS.
Reading over these SQS docs I am not sure if this is exactly what I am looking for.
Here is the code I am running in the Lambda that works locally but not on AWS. See the *'s for important pieces:
from multiprocessing import Process, Queue
from craigslist import CraigslistForSale
import time
import math
sitesHold = ["sfbay", "seattle", "newyork", "(many more)..." ]
results = []
def f(sites, category, search_keys, queue):
local_results = []
for site in sites:
cl_fs = CraigslistForSale(site=site, category=category, filters={'query': search_keys})
for result in cl_fs.get_results(sort_by='newest'):
local_results.append(result)
if len(local_results) > 0:
print(local_results)
queue.put(local_results) # Putting data *********************************
def scan_handler(event, context):
started_at = time.monotonic()
queue = Queue()
print("Running...")
amount_of_lists = int(event['amountOfLists'])
list_length = int(len(sitesHold) / amount_of_lists)
extra_lists = math.ceil((len(sitesHold) - (amount_of_lists * list_length)) / list_length)
site_list = []
list_creator_counter = 0
site_counter = 0
for i in range(amount_of_lists + extra_lists):
site_list.append(sitesHold[list_creator_counter:list_creator_counter + list_length])
list_creator_counter += list_length
processes = []
for i in range(len(site_list)):
site_counter = site_counter + len(site_list[i])
processes.append(Process(target=f, args=(site_list[i], event['category'], event['searchQuery'], queue,))) # Creating processes and creating queues ***************************
for process in processes:
process.start() # Starting processes ***********************
for process in processes:
listings = queue.get() # Getting from queue ****************************
if len(listings) > 0:
for listing in listings:
results.append(listing)
print(f"Results: {results}")
for process in processes:
process.join()
total_time_took = time.monotonic() - started_at
print(f"Sites processed: {site_counter}")
print(f'Took {total_time_took} seconds long')
This is the error the Lambda function is giving me:
{
"errorMessage": "[Errno 38] Function not implemented",
"errorType": "OSError",
"stackTrace": [
" File \"/var/task/main.py\", line 90, in scan_handler\n queue = Queue()\n",
" File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 103, in Queue\n return Queue(maxsize, ctx=self.get_context())\n",
" File \"/var/lang/lib/python3.8/multiprocessing/queues.py\", line 42, in __init__\n self._rlock = ctx.Lock()\n",
" File \"/var/lang/lib/python3.8/multiprocessing/context.py\", line 68, in Lock\n return Lock(ctx=self.get_context())\n",
" File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 162, in __init__\n SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
" File \"/var/lang/lib/python3.8/multiprocessing/synchronize.py\", line 57, in __init__\n sl = self._semlock = _multiprocessing.SemLock(\n"
]
}
Does Queue() work in an AWS Lambda? What is the best way to accomplish my goal?