0

I have a function that is looping using values from a dictionary. I want to split my dict keys, so i can break my dict at parts equal to my cpus. My fucntion is:

def find_something2(new2, threl=2.0, my_limit=150, far=365):
""" Find stocks tha are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
for stock in new2:
    frame = reduced_stocks[stock]
    temp = frame.loc[current_date:end_date]
    if not temp.empty:
        mydate = temp.head(far).Low.idxmin()
        if mydate <= min_date_sell:
            my_min = temp.head(far).Low.min()
            if total_money >= my_min > 0:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                               thres=threl, sell_limit=my_limit)
                if ans:
                    if income > 3 * 10 ** 6:
                        worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
    return sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
    return sorted(worthing, key=itemgetter(0))
else:
    answer = sorted(worthing, key=itemgetter(5), reverse=True)
    return answer[::11]

so what i have tried is:

import multiprocessing as mp
result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    global result_list
    result_list.append(result)
def apply_async_with_callback():
    global reduced_stocks
    temp = list(reduced_stocks.keys())
    temp1 = temp[0:1991]
    temp2 = temp[1991:]
    temp = [temp1, temp2]
    pool = mp.Pool(2)
    for i in temp:
        pool.apply_async(find_something2, args=(i, 1.1, 2200, 1,), callback=log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
        apply_async_with_callback()

is this the right way?

I also tried threads but cpu goes max at 15% althoug iam using 12 threads(i have 6 intel core)

def pare():        
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]  
data = [x  for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
    process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
    process.start()
    threads.append(process)
for process in threads:
    process.join()

1 Answers1

0

One way to do multiprocessing is to create a Pool and pass the prepared data to it. Wait for computation done and process the results. The code suggests how to do that.

# setup the function so it gets everything from arguments
def find_something2(new2, threl, my_limit, far, current_date, total_money, min_date_sell, reduced_stocks, end_date):
    # ....
    pass

# prepare the data
# replace the a1, a2 ... with the actual parameters your function takes
data = [(a1, a2, a3, ...) for your_data in your_dict]

import multiprocessing as mp
with mp.Pool() as pool:
    results = pool.starmap(find_something2, data)

    print(results)
Marek Schwarz
  • 578
  • 6
  • 10
  • Why i have to pass everyting from arguments instead of using global variables? – Engineer_auth1234 Oct 23 '19 at 15:28
  • You don't "have to", but in my experience it is safer. You clearly see what gets passed to where. Also you can't simple update global state from within multiprocessing worker see this [thread](https://stackoverflow.com/questions/11055303/multiprocessing-global-variable-updates-not-returned-to-parent) – Marek Schwarz Oct 23 '19 at 15:55
  • i get again unexpected results. It seems running and then functions that should not have be called are running. find_something does not change any global value. i just want to slice my data and each procces to take a part of them. My function is returnig a list. so I just want to have as a relsult a list of lists. – Engineer_auth1234 Oct 23 '19 at 18:58
  • The `find_something` should not change any of the globals (as explained in the link posted previously). If you prepared the `data` correctly, then the `find_something` should receive **one** iteration of your data (To process a list of `[1, 2, 3]` as data, the func should receive `1` (or one dict record in your case)). The `Pool` then handles that the data are processed with `N` processes equal to number of threads your CPU can handle. – Marek Schwarz Oct 23 '19 at 20:13
  • As i understand the problem is that function find_something, instead of returnig at the right address,(right position at program), it goes somewhereelse and start executing other parts of my code. What can make the processes to return at wrong addreeses? – Engineer_auth1234 Oct 24 '19 at 00:29
  • could the problem be the global variables?although i only read from them and not updating them? The problem is i have a lot of big data stored in memory(big data frame). As i read "Do child processes spawned via multiprocessing share objects created earlier in the program?" No (python before 3.8), and Yes in 3.8. so maybe that is making problems – Engineer_auth1234 Oct 24 '19 at 00:42
  • Maybe thread would be better solution as threads have shared memory! – Engineer_auth1234 Oct 24 '19 at 00:50