at the moment my program starts producer threads and a consumer thread, all threads pick up and insert elements in a shared queue object, and keep a reference to it.
I know that the garbage collector cleans the memory when you no longer have references to the object, but my problem is another since I have to keep references to the queue to continue inserting or removing items
Starting the program I noticed that when I insert an element in the queue the memory space allocated for the queue grows together with the length of the queue, while when I take an element from the queue its size decreases instead the memory space assigned to the queue does not decrease, is there a way to decrease the memory space too? With this problem, the allocated memory continues to grow for each inserted element until it occupies all the available memory.
UPDATE after some research i found this, it seems to be a problem very similar to mine https://python-forum.io/Thread-Queue-Queue-would-not-reduce-capacity-after-get also suggest to create a new queue and swap, how can I do it efficiently?
an example of my code
Main thread creates process_queue and starts Threads
#this is the main thread
import queue
process_queue = ProcessQueue()
#main thread starts StartProcessThread and SendingThread, passes process_queue as param
ProcessQueue
class ProcessQueue():
def __init__(self):
self.q = queue.Queue()
def put(self,elem):
self.q.put(elem)
def get(self):
elem = self.q.get()
temp = queue.Queue()
#swap queue
return elem
def task_done(self):
self.q.task_done()
StartProcessThread
class StartProcessThread(Thread):
def __init__(self, process_queue):
super().__init__()
self.process_queue = process_queue
#some vars
#some methods
def start_download(self, name):
try:
#starts a process
process = Popen(
["path", "params"])
except Exception as e:
return False
process_list = [process, name]
self.process_queue.put(process_list)
return True
def task(self, url, name):
#something
if self.start_download(name):
#something
return True
else:
return False
def run(self):
# threadpoolexecutor
executor = ThreadPoolExecutor(max_workers=3)
while True:
strm = #get something from db
if str:
executor.submit(self.task, (url))
else:
sleep(10)
SendingThread
class SendingThread(Thread):
def __init__(self,process_queue):
super(SendingThread, self).__init__()
self.process_queue = process_queue
def get_process(self):
process = self.process_queue.get()
poll = process[0].poll()
# if poll == none process still alive
if poll is not None:
return process
self.process_queue.put(process)
return []
def run(self):
while True:
process = self.get_process()
if process:
# do something