0

Here is my user defined function

yms = ['201901', '201902', '201903', '201904', '201905', .... , '201812']

def minc(ym):
    print('MINC %s %s\n' %(ym, str(datetime.datetime.now())))
    print('value %s is in PID : %s \n' %(ym, os.getpid()))
    t = datetime.datetime.now()
    minc1 = pd.read_sql("""
    select substring(MINC_IN_YM, 1, 4) as YEAR, substring(MINC_IN_YM, 5, 2) as MONTH,
    count(MINC_INSP_NO) as NROWS,
    sum(MINC_OKQTY) as TOTAL_QUANTITY,
    sum(MINC_AV_PRICE*MINC_OKQTY) as TOTAL_DOLLARS
    from dwadm.W_MINC
    where MINC_INC_INF in ('RN', 'CN')
    and MINC_ACCID in ('A', 'G', 'V')
    and MINC_IN_YM = '%s'
    and substring(MINC_BRNCD, 1, 1) not in ('S', 'C')
    GROUP BY YEAR, MONTH, MINC_BRNCD, MINC_BRNCD_WHS, MINC_VNDCD, MINC_PTNO
    """ % ym, conn)
    print('MINC ends %s1 %s\n' %(ym, str(datetime.datetime.now()), str(datetime.datetime.nows())
    
    return minc1

and I wanna do multiprocessing with function 'minc()' with below function 'parallelized()'

def parallelized():
    if __name__ == '__main__' :
      pool = Pool(processes = 8)
      df = pool.map(minc, yms)
      pool.close()
      pool.join()

parallelized()

or

if __name__ == '__main__':
    freeze_support()
    t1 = time.time()
    pool = Pool(8)
    pool.map(minc, yms)
    pool.close()
    pool.join()

But when I run these above codes, these codes do not stop. They are infinitely running... I can't know how to solve this problem. When I use not my user defined function 'minc()' but use 'sum()' or any other basic functions, I could realize that parallel processing was executed well. Is there any other solutions for executing parallel processing of user-defined functions well?

Please help me!! :(

It is an OUTPUT when parallelized() function was executed.

MINC 201902 2020-11-03 08:15:23.469221

MINC 201901 2020-11-03 08:15:23.469228

MINC 201903 2020-11-03 08:15:23.469482

MINC 201904 2020-11-03 08:15:23.469703

MINC 201905 2020-11-03 08:15:23.469915

MINC 201906 2020-11-03 08:15:23.470106

MINC 201907 2020-11-03 08:15:23.470283

value 201902 is in PID : 1222

MINC 201908 2020-11-03 08:15:23.470459

value 201901 is in PID : 1221

value 201903 is in PID : 1223

value 201904 is in PID : 1224

value 201905 is in PID : 1225

value 201906 is in PID : 1226

value 201907 is in PID : 1227

value 201908 is in PID : 1228

MINC 201909 2020-11-03 08:15:23.489395

value 201909 is in PID : 1222

1 Answers1

0

This is a simplified version of your code and the way you are using multiprocessing.Pool seems correct. The program can hang if the code in the subprocesses is malfunctioning. Are you sure that your pd.read_sql code exits with a good status?

import datetime
from multiprocessing.pool import Pool
from os import getpid

yms = range(1, 10000)


def minc(ym):
    print('MINC %s %s %s\n' % (ym, getpid(), str(datetime.datetime.now())))

    return ym


def parallelized():
    pool = Pool(processes = 8)
    df = pool.map(minc, yms)
    print("FINISHED")
    assert list(yms) == df

    # you don't really have to close/join but it is still a good idea
    # https://stackoverflow.com/a/38271957/598057
    pool.close()
    pool.join()

parallelized()
Stanislav Pankevich
  • 11,044
  • 8
  • 69
  • 129
  • When I run the uploaded code, do 8 pids have to be repeated? – Seong Ung Choi Nov 01 '20 at 23:24
  • When not parallel processing, I have found that user-defined function runs fine. :( – Seong Ung Choi Nov 01 '20 at 23:25
  • There is no problem importing data from DB in each jupyter notebook without using 'pool.map'. Do you know what's the difference between separately loading from DB and loading with map() at once? – Seong Ung Choi Nov 02 '20 at 01:28
  • `When I run the uploaded code, do 8 pids have to be repeated?` Yes, because that's why Pool exists. The same pool with the same processes is used for tasks. – Stanislav Pankevich Nov 02 '20 at 09:18
  • Could you add your output log to your question? – Stanislav Pankevich Nov 02 '20 at 09:19
  • I set the log of the info level to appear on the console and saved the logs of the debug and error levels separately, but nothing was printed or saved. I uploaded the output that came out when the parallelized() function in the question was executed. It keeps running in that state. – Seong Ung Choi Nov 02 '20 at 23:20
  • You can see that there is no single `MINC ends` line. This means that the `pd.read_sql` calls do not complete. I cannot help further without having access to your environment. For one thing, you could check what is happening in the PostgreSQL logs - you should see what is happening to your queries. – Stanislav Pankevich Nov 03 '20 at 09:41
  • Another comment: I encourage you to ask one more question: it looks like this task that you are trying to parallelize could be accomplished by writing a good PostgreSQL query. You would not have to parallelize anything with Pandas if you could ask for this data in one or a few correctly crafted PG queries. – Stanislav Pankevich Nov 03 '20 at 09:42