0

I have the this code (Python 3.5):

    datarALL = []
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        futh = [(executor.submit(self.getdata2, page, hed, data, apifolder,additional)) for page in pages]
        for data in as_completed(futh):
            datarALL = datarALL + data.result()
    return datarALL

This creates threads, execute a function and combine the results to a list.

When I execute on small scale it works great but when the number of pages is large the scripts writes:

Killed

When I monitor it using htop i see the Killed is due to memory problems.

I tried to convert the datarALL = datarALL + data.result() to writing on a file so each finished thread will have the results on disk rather than on the memory.

This is what I did:

    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        futh = [(executor.submit(self.getdata2, page, hed, data, apifolder, additional)) for page in
                pages]
        for data in as_completed(futh):
            datarALL = [data.result()]
            with open("test.txt", "wb") as fp:
                pickle.dump(datarALL, fp)

    with open("test.txt", "rb") as fp:  # Unpickling
        b = pickle.load(fp)
    return b

But the memory isn't cleared and the script still killed.

What can I so to solve such problem?

I need this script to support handling unknown amount of data.

Edit:

adding getdata2 code:

def getdata2(self, page, hed, data, apifolder, additional):
    tries = 10
    for n in range(tries):
        try:
            value_limit = self.config._page_limit  # limit of records allowed per page
            value_offset = page * value_limit
            datarALL = []
            url = 'http://www.mywebsite.com/{2}?WithTotal=true&cultureid=2&offset={0}&limit={1}{3}'.format(
                value_offset, value_limit, apifolder, additional)
            print ("Generate page: #{0} run #{1} with URL: {2}".format(page, n, url))
            responsedata = requests.get(url, data=data, headers=hed, verify=False)
            if responsedata.status_code == 200:  # 200 for successful call
                responsedata = responsedata.text
                jsondata = json.loads(responsedata)
                if "results" in jsondata:
                    if jsondata["results"]:
                        datarALL = datarALL + jsondata["results"]
            print ("page {} finished".format(page))
            return datarALL

        except ChunkedEncodingError as e:
            print ("page #{0} run #{1} failed. Retry.".format(page, n))
            if n == tries - 1:
                print ("page {0} could not be imported. Max retried reached.".format(page))
                print("Unexpected error:", sys.exc_info()[0])
                raise e

Log:

num of records to import is 21348
num of pages to import is 86
2018-08-27 09:47:42.210912 Generate page: #0 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=0&limit=249
2018-08-27 09:47:42.218939 Generate page: #1 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=249&limit=249
2018-08-27 09:47:42.227159 Generate page: #2 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=498&limit=249
2018-08-27 09:47:42.228641 Generate page: #3 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=747&limit=249
2018-08-27 09:48:03.721129 page 0 finished
2018-08-27 09:48:03.721510 Generate page: #4 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=996&limit=249
2018-08-27 09:48:19.740866 page 2 finished
2018-08-27 09:48:19.741651 Generate page: #5 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1245&limit=249
2018-08-27 09:48:23.633712 page 4 finished
2018-08-27 09:48:23.634187 Generate page: #6 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1494&limit=249
2018-08-27 09:48:43.598300 page 1 finished
2018-08-27 09:48:43.599237 Generate page: #7 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1743&limit=249
page #6 run #0 failed. Retry.
2018-08-27 09:48:43.671394 Generate page: #6 run #1 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1494&limit=249
page #5 run #0 failed. Retry.
2018-08-27 09:48:44.198029 Generate page: #5 run #1 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1245&limit=249
2018-08-27 09:48:57.072556 page 6 finished
2018-08-27 09:48:57.073005 Generate page: #8 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1992&limit=249
2018-08-27 09:49:11.236083 page 5 finished
2018-08-27 09:49:11.245397 Generate page: #9 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=2241&limit=249
2018-08-27 09:49:13.057340 page 8 finished
2018-08-27 09:49:13.057516 Generate page: #10 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=2490&limit=249
2018-08-27 09:49:33.802848 page 3 finished
2018-08-27 09:49:33.813404 Generate page: #11 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=2739&limit=249
2018-08-27 09:49:41.440440 page 10 finished
2018-08-27 09:49:41.440915 Generate page: #12 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=2988&limit=249
page #7 run #0 failed. Retry.
2018-08-27 09:49:41.500190 Generate page: #7 run #1 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=1743&limit=249
2018-08-27 09:49:50.171217 page 11 finished
2018-08-27 09:49:50.189446 Generate page: #13 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=3237&limit=249
2018-08-27 09:49:54.881509 page 12 finished
2018-08-27 09:49:54.881826 Generate page: #14 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=3486&limit=249
2018-08-27 09:50:06.699138 page 14 finished
2018-08-27 09:50:06.708714 Generate page: #15 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=3735&limit=249
2018-08-27 09:50:17.203238 page 13 finished
2018-08-27 09:50:17.203766 Generate page: #16 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=3984&limit=249
2018-08-27 09:50:18.200983 page 15 finished
2018-08-27 09:50:18.201452 Generate page: #17 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=4233&limit=249
2018-08-27 09:50:29.642942 page 7 finished
.
.
.
2018-08-27 09:55:59.088085 page 42 finished
2018-08-27 09:55:59.088767 Generate page: #44 run #0 with URL: http://www.myweb.com?WithTotal=true&cultureid=2&offset=10956&limit=249
Killed

Updated code:

        datarALL = []
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            futh = [(executor.submit(self.getdata3, page, hed, data, apifolder,additional)) for page in pages]
            for data in as_completed(futh):
                #datarALL = datarALL + data.result()
                datarALL.append(data.result())

        return datarALL

and the function:

def getdata3(self, page, hed, data, apifolder, additional):
    tries = 10
    for n in range(tries):
        try:
            value_limit = self.config.page_limit  # limit of records allowed per page
            value_offset = page * value_limit
            datarALL = []
            url = 'http://www.myebsite.com/{2}?WithTotal=true&cultureid=2&offset={0}&limit={1}{3}'.format(
                value_offset, value_limit, apifolder, additional)
            print ("{3} Generate page: #{0} run #{1} with URL: {2}".format(page, n, url,str(datetime.now())))
            responsedata = requests.get(url, data=data, headers=hed, verify=False)
            if responsedata.status_code == 200:  # 200 for successful call
                responsedata = responsedata.text
                jsondata = json.loads(responsedata)
                if "results" in jsondata:
                    if jsondata["results"]:
                        datarALL.append( jsondata["results"])
            print ("{1} page {0} finished".format(page,str(datetime.now())))
            return datarALL

        except ChunkedEncodingError as e:
            print ("page #{0} run #{1} failed. Retry.".format(page, n))
            if n == tries - 1:
                print ("page {0} could not be imported. Max retried reached.".format(page))
                print("Unexpected error:", sys.exc_info()[0])
                raise e
Programmer120
  • 2,362
  • 9
  • 30
  • 48
  • `datarALL.append(data.result())` and not `datarALL = datarALL + data.result()` – Burhan Khalid Aug 27 '18 at 07:28
  • What does `self.getdata2` do? – Yaniv Oliver Aug 27 '18 at 07:29
  • @yanivoliver self.getdata2 is a function that connects to API and make GET request. It returns the data – Programmer120 Aug 27 '18 at 07:30
  • @BurhanKhalid I will check it in a minute but from what I read about it https://stackoverflow.com/questions/2022031/python-append-vs-operator-on-lists-why-do-these-give-different-results there might still be a problem of memory if importing GBs of data.... – Programmer120 Aug 27 '18 at 07:35
  • If you're still running out of memory in the second case, the issue is likely somewhere else. Since you mention "pages", best is to try and process the data by page (or a limited set of pages) at a time. – 9769953 Aug 27 '18 at 07:37
  • @9769953 records per page can not exceed 249 (API limitation). I tried to play with the number of pages... max <-> min eventually I'm always out of memory. – Programmer120 Aug 27 '18 at 07:40
  • @yanivoliver see edit. added the function code. – Programmer120 Aug 27 '18 at 07:41
  • Have you tried without threads, i.e., just linear processing, page by page? – 9769953 Aug 27 '18 at 07:53
  • @9769953 I did.. but never finished it. I stopped it when it took over 3 hours... Threads seems to work. with Burhan Khalid fix I now successed to have more pages.. but still it gets Killed. – Programmer120 Aug 27 '18 at 07:57
  • You are opening and closing the same file and overriding the same data in your pickle dump loop. You need to get rid of that entire section. – Burhan Khalid Aug 27 '18 at 08:03
  • @BurhanKhalid how? – Programmer120 Aug 27 '18 at 08:06
  • @BurhanKhalid the dump is done one time by each thread with it's own data. isn't it? – Programmer120 Aug 27 '18 at 08:23
  • But did you see a (slow) increase in memory of those 3 hours, or did it remain nearly constant? Because if the latter, that shows that the problem is all the threads running at once, each requiring a certain amount of memory, thus adding up. – 9769953 Aug 27 '18 at 09:04
  • @9769953 I didn't check... I am positive the problem is with how I treat the return data from the threads and not with the thread itself... I'm not doing anything sophisticated here.. there is not even locks or shared resources... – Programmer120 Aug 27 '18 at 09:16
  • That's not what I mean: if some of the threads are taking their time because there is a large amount of data to download for those specific threads, you'll have multiple slow, long-working, threads all gathering a significant amount of data, that all has to stay in memory at once. (How) can you exclude that? – 9769953 Aug 27 '18 at 09:23
  • @9769953 I limit the run to only 4 active threads. when one finished another starts. Notice: max_workers=os.cpu_count() – Programmer120 Aug 27 '18 at 09:26
  • Keep in mind though, that the threads requiring the largest download, will be the ones taking the most times, and thus you'll need up with 4 threads downloading a lot of data. – 9769953 Aug 27 '18 at 09:27
  • While I guess 4 threads isn't a large number (I didn't have your number of CPUs before), can you be sure that none of the downloads contains an exceedingly large amount of data? Since the single-threaded function takes 3 hours, I wonder if there are a few cases where the data download is (too) large. – 9769953 Aug 27 '18 at 09:29
  • @9769953 all threads import the exact same amount of records 245.. I can see their progress.. w8 i'll edit my question with detailed log so you can see – Programmer120 Aug 27 '18 at 09:44
  • Side note: why do you have `datarALL = datarALL + jsondata["results"]` in `getdata2` when there is no loop, and thus no need for appending? Why not simply `datarAll = jsondata['results']`? It won't matter, but it seems weird. – 9769953 Aug 27 '18 at 09:53
  • @9769953 see log. I already changed the datarALL = datarALL + jsondata["results"] to datarALL.append( jsondata["results"]) – Programmer120 Aug 27 '18 at 10:43
  • No, the side note was related to the line in the thread; that append/`+` is unnecessary in the thread. In the main thread, it should matter little which version you use. – 9769953 Aug 27 '18 at 10:59
  • @9769953 see update to the functions.. let me know what you think. Problem still exists. – Programmer120 Aug 27 '18 at 11:05
  • By the time taken for a call to `getdata` I have the feeling the size of each entry is large (or your connection is very slow). How large is each response? – Yaniv Oliver Aug 27 '18 at 11:23
  • @yanivoliver large. But regardless even if I change the limit to 1 - I'll eventually face the same problem. I need to find a way to save the results to the disk so the memory won't build up. once I have it on disk I will simply work on this file. – Programmer120 Aug 27 '18 at 11:28

0 Answers0