2

Either my processes kicking off one after another finishes or they start (simultaneously) but without calling the pointing function. I tried many variants somehow it will not act like many tutorials teach. My Goal is to fuzzywuzzy String match a 80k item list of text sentences, droping unneccessary 90%+ matches while keeping the String with the most information (scorer=fuzz.token_set_ratio). Thank you!

IDE is Anaconda Spyder 4.0, IPython 7.10.1, Python 3.7.5

# -*- coding: utf-8 -*-
import pandas as pd
import multiprocessing
import time
from datetime import datetime
from fuzzywuzzy import fuzz
from fuzzywuzzy import process

#########
preparedDF = []
df1 = []
df2 = []
df3 = []
df4 = []
df5 = []
df6 = []
df7 = []
df8 = []
#########
xdf1 = []
xdf2 = []
xdf3 = []
xdf4 = []
xdf5 = []
xdf6 = []
xdf7 = []
xdf8 = []
#########

def fuzzyPrepare():
    #load data do some easy cleaning
    global preparedDF
    df = pd.read_csv("newEN.csv")
    df = df["description"].fillna("#####").tolist()
    df = list(dict.fromkeys(df))
    try:
        df = df.remove("#####")
    except ValueError:
        pass
    preparedDF=df

def fuzzySplit(df=preparedDF):
    #split data to feed processes
    global df1, df2, df3, df4, df5, df6, df7, df8
    df1 = df[:100]
    df2 = df[100:200]
    df3 = df[200:300]
    df4 = df[300:400]
    df5 = df[400:500]
    df6 = df[500:600]
    df7 = df[600:700]
    df8 = df[700:800]

def fuzzyMatch(x):
    #process.dedupe returns dict_keys object so pass it to a list()
    global xdf1, xdf2, xdf3, xdf4, xdf5, xdf6, xdf7, xdf8
    if x == 1:
        xdf1=list(process.dedupe(df1,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 2:
        xdf2=list(process.dedupe(df2,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 3:
        xdf3=list(process.dedupe(df3,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 4:
        xdf4=list(process.dedupe(df4,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 5:
        xdf5=list(process.dedupe(df5,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 6:
        xdf6=list(process.dedupe(df6,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 7:
        xdf7=list(process.dedupe(df7,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 8:
        xdf8=list(process.dedupe(df8,threshold=90,scorer=fuzz.token_set_ratio))
    else:
        return "error in fuzzyCases!"

#if __name__ == '__main__':
fuzzyPrepare()
fuzzySplit(preparedDF)
#UNHEEDED MULTIPROCESSING, ONLY THIS LINE TRIGGERS THE ACTUAL FUNCTION -> p1 = multiprocessing.Process(name="p1",target=fuzzyMatch(1), args=(1,))

p1 = multiprocessing.Process(name="p1",target=fuzzyMatch, args=(1,))
p2 = multiprocessing.Process(name="p2",target=fuzzyMatch, args=(2,))
p3 = multiprocessing.Process(name="p3",target=fuzzyMatch, args=(3,))
p4 = multiprocessing.Process(name="p4",target=fuzzyMatch, args=(4,))
p5 = multiprocessing.Process(name="p5",target=fuzzyMatch, args=(5,))
p6 = multiprocessing.Process(name="p6",target=fuzzyMatch, args=(6,))
p7 = multiprocessing.Process(name="p7",target=fuzzyMatch, args=(7,))
p8 = multiprocessing.Process(name="p8",target=fuzzyMatch, args=(8,))

jobs = []
jobs.append(p1)
jobs.append(p2)
jobs.append(p3)
jobs.append(p4)
jobs.append(p5)
jobs.append(p6)
jobs.append(p7)
jobs.append(p8)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
muka90
  • 91
  • 10
  • 1
    ``multiprocessing`` does not share memory. Changes to globals after spawning a ``Process`` is not visible between processes. – MisterMiyagi Dec 21 '19 at 21:20

1 Answers1

1

Ok, you are dealing with a non-trivial problem here. I have taken the liberty to DRY (Don't Repeat Yourself) your code a bit. I also dont have your data or pandas installed so I have simplified the inputs and outputs. The principles however are all the same and with few changes you should be able to make your code work!

Attempt #1

I have an array of 800 int elements and each process is going to calculate the sum of 100 of them. Look for # DRY: comments

# -*- coding: utf-8 -*-
import multiprocessing
import time
from datetime import datetime

#########
number_of_proc = 8
preparedDF = []
# DRY: This is now a list of lists. This allows us to refer to df1 as dfs[1]
dfs = []
# DRY: A dict of results. The key will be int (the process number!)
xdf = {}
#########

def fuzzyPrepare():
    global preparedDF
    # Generate fake data
    preparedDF = range(number_of_proc * 100)

def fuzzySplit(df):
    #split data to feed processes
    global dfs
    # DRY: Loop and generate N lists for N processes
    for i in range(number_of_proc):
        from_element = i * 100
        to_element = from_element + 100
        print("Packing [{}, {})".format(from_element, to_element))
        dfs.append(df[from_element:to_element])

def fuzzyMatch(x):
    global xdf
    # DRY: Since we now have a dict, all the if-else is not needed any more...
    xdf[x] = sum(dfs[x])
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))


if __name__ == '__main__':
    fuzzyPrepare()
    fuzzySplit(preparedDF)

    # DRY: Create N processes AND append them
    jobs = []
    for p in range(number_of_proc):
        p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,))
        jobs.append(p)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
print("results:")
for x in range(number_of_proc):
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))

Output:

Packing [0, 100)
Packing [100, 200)
Packing [200, 300)
Packing [300, 400)
Packing [400, 500)
Packing [500, 600)
Packing [600, 700)
Packing [700, 800)
process p0 started at 19:12:00
In process: x=0, xdf[0]=4950
process p1 started at 19:12:00
In process: x=1, xdf[1]=14950
process p2 started at 19:12:00
In process: x=2, xdf[2]=24950
process p3 started at 19:12:01
In process: x=3, xdf[3]=34950
process p4 started at 19:12:01
In process: x=4, xdf[4]=44950
process p5 started at 19:12:01
In process: x=5, xdf[5]=54950
process p6 started at 19:12:01
In process: x=6, xdf[6]=64950
process p7 started at 19:12:02
In process: x=7, xdf[7]=74950
processing complete at 19:12:02
results:
Traceback (most recent call last):
  File "./tmp/proctest.py", line 58, in <module>
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
KeyError: 0

What happened? I printed the values in the processing function and they were there?!

Well, I am not an expert but a python process works much like fork(). The basic principle is that it will spawn and initialize a new child process. The child process will be having a COPY(!) of the parents memory. This means that the parent and child processes do not share any data/memory!!!

So in our case:

  • We prepare our data
  • We create N processes
  • Each process has a COPY of dfs and xdf variables

While for dfs we do not care too much (since they are used for input), each process now has it own xdf and not the parent's one! You see why the KeyError?

How to fix this (Attempt #2)

It is now obvious that we need to return data back from the process to the parent. There are many ways of doing this but the simpest (code-wise) is to use a multiprocessing.Manager to share data between your child processes (look for # NEW: tag in the code - Note I have only changed 2 lines!):

# -*- coding: utf-8 -*-
import multiprocessing
import time
from datetime import datetime

# NEW: This can manage data between processes
from multiprocessing import Manager

#########
number_of_proc = 8
preparedDF = []
dfs = []
# NEW: we create a manager object to store the results
manager = Manager()
xdf = manager.dict()
#########

def fuzzyPrepare():
    global preparedDF
    # Generate fake data
    preparedDF = range(number_of_proc * 100)

def fuzzySplit(df):
    #split data to feed processes
    global dfs
    # DRY: Loop and generate N lists for N processes
    for i in range(number_of_proc):
        from_element = i * 100
        to_element = from_element + 100
        print("Packing [{}, {})".format(from_element, to_element))
        dfs.append(df[from_element:to_element])

def fuzzyMatch(x):
    global xdf
    # DRY: Since we no have a dict, all the if-else is not needed any more...
    xdf[x] = sum(dfs[x])
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))


if __name__ == '__main__':
    fuzzyPrepare()
    fuzzySplit(preparedDF)

    # DRY: Create N processes AND append them
    jobs = []
    for p in range(number_of_proc):
        p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,))
        jobs.append(p)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
print("results:")
for x in range(number_of_proc):
    print("Out of process: x={}, xdf[{}]={}".format(x, x, xdf[x]))

And the output:

Packing [0, 100)
Packing [100, 200)
Packing [200, 300)
Packing [300, 400)
Packing [400, 500)
Packing [500, 600)
Packing [600, 700)
Packing [700, 800)
process p0 started at 19:34:50
In process: x=0, xdf[0]=4950
process p1 started at 19:34:50
In process: x=1, xdf[1]=14950
process p2 started at 19:34:50
In process: x=2, xdf[2]=24950
process p3 started at 19:34:51
In process: x=3, xdf[3]=34950
process p4 started at 19:34:51
In process: x=4, xdf[4]=44950
process p5 started at 19:34:51
In process: x=5, xdf[5]=54950
process p6 started at 19:34:52
In process: x=6, xdf[6]=64950
process p7 started at 19:34:52
In process: x=7, xdf[7]=74950
processing complete at 19:34:52
results:
Out of process: x=0, xdf[0]=4950
Out of process: x=1, xdf[1]=14950
Out of process: x=2, xdf[2]=24950
Out of process: x=3, xdf[3]=34950
Out of process: x=4, xdf[4]=44950
Out of process: x=5, xdf[5]=54950
Out of process: x=6, xdf[6]=64950
Out of process: x=7, xdf[7]=74950

Read more about this here and note the warning about Manager being slower than a multiprocessing.Array (which actually also solves your problem here)

urban
  • 5,392
  • 3
  • 19
  • 45
  • More interesting info [here](https://stackoverflow.com/questions/9436757/how-does-multiprocessing-manager-work-in-python) (as I said, not an expert so i read about this myself :) ) – urban Dec 21 '19 at 19:44
  • urban thank you for your input.Copy pasting your second attempt is resulting in a non working kernel (no outputs at all, same Environment). I need to restart the kernel because it is freezing somehow. – muka90 Dec 27 '19 at 20:18