So I hope this isn't a duplicate, however I either haven't been able to find the adequate solution or I just am not 100% on what I'm looking for. I've written a program to thread lots of requests. I create a thread to
- Fetch responses from a number of api's such as this: share.yandex.ru/gpp.xml?url=MY_URL as well as scraping blogs
- Parse the responses of all requests from the example above/ json/ using python-goose to extract articles
- Return the parsed results back to the primary thread and insert into a database.
It's all been going well until it needs to pull back larger amounts of data which i haven't tested before. The primary reason for this is that it takes me over my shared memory limit on a shared Linux server (512mb) initiating a kill. This should be enough as it's only a few thousand requests, although i could be wrong. I'm clearing all large data variables/ objects within the main thread but that doesn't seem to help either.
I ran a memory_profile on the primary function which creates the threads with a thread class which looks like this:
class URLThread(Thread):
def __init__(self,request):
super(URLThread, self).__init__()
self.url = request['request']
self.post_id = request['post_id']
self.domain_id = request['domain_id']
self.post_data = request['post_params']
self.type = request['type']
self.code = ""
self.result = ""
self.final_results = ""
self.error = ""
self.encoding = ""
def run(self):
try:
self.request = get_page(self.url,self.type)
self.code = self.request['code']
self.result = self.request['result']
self.final_results = response_handler(dict(result=self.result,type=self.type,orig_url=self.url ))
self.encoding = chardet.detect(self.result)
self.error = self.request['error']
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
errors.append((exc_type, fname, exc_tb.tb_lineno,e,'NOW()'))
pass
@profile
def multi_get(uris,timeout=2.0):
def alive_count(lst):
alive = map(lambda x : 1 if x.isAlive() else 0, lst)
return reduce(lambda a,b : a + b, alive)
threads = [ URLThread(uri) for uri in uris ]
for thread in threads:
thread.start()
while alive_count(threads) > 0 and timeout > 0.0:
timeout = timeout - UPDATE_INTERVAL
sleep(UPDATE_INTERVAL)
return [ {"request":x.url,
"code":str(x.code),
"result":x.result,
"post_id":str(x.post_id),
"domain_id":str(x.domain_id),
"final_results":x.final_results,
"error":str(x.error),
"encoding":str(x.encoding),
"type":x.type}
for x in threads ]
And the results look like this on the first batch of requests i pump through it (FYI it's a link as the output text isn't readable in here, i can't paste a html table or embed an image until i get 2 more points ):
http://tinypic.com/r/28c147d/8
And it doesn't seem to drop any of the memory in subsequent passes (I'm batching 100 requests/ threads through at 1 time). By this i mean once a batch of threads is complete they seem to stay in memory ad every time it runs another, memory is added as below:
Am I doing something really stupid here?