1

I am new in Python, I had a program which loads one big CSV file where is over 100k lines, each line had 4 columns. In FOR loop I check for each row same duplicated list (dlist), this dlist is list of objects of DRef class which I load with another function

DsRef class:

from tqdm import tqdm
from multiprocessing import Pool, cpu_count, freeze_support

class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)  

    def __iter__(self):
        return iter(self.__dict__.items())

Duplication class:

class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                 'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp 

Load data file sample for testing:

dlist= []
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))

Method to find and return rows where were duplicated values:

def FindDuplications(dlist):
    duplicates = []
    for pn, comp in enumerate(dlist):            
        matches = [xpn for xpn, xcomp in enumerate(dlist) if pn == xpn and comp == xcomp]
        duplicates.append(Duplication(pn, comp, len(matches)))
    return duplicates

row.pn == x.pn and row.comp == x.comp if its true I find a duplication I compare first 2 parameters of each objech with each object in list

Now I try to use something like that for use all processor for a faster result, now it takes over 15 minutes

if __name__ == '__main__':
    freeze_support()
    p = Pool(cpu_count())
    duplicates = p.map(FindDuplications, dlist)
    p.close()
    p.join()

In first I got an error when Class is not iterable then I create iter functions for first class, after that, I got an error then tuple object does not know pn or comp parameter, then I use in for enumerate(dlist) but still does not work

Could you please help me?

I would like also use TQDM to check the progress of processing function to find duplications


there is an original working function without use Multithreading:

def CheckDuplications(dlist):
    print(get_yellow("========= CHECK CROSS DUPLICATIONS ========="))
    duplicates = []
    for r in tqdm(dlist):
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    results = [d for d in duplicates if d.cnt > 1]
    results = set(results)
    return results

From function FindDuplications I got list of DsRef objects (simple copy), but this must return list of Duplication objects, something is wrong

Thank you

Jan Sršeň
  • 1,045
  • 3
  • 23
  • 46
  • what is the problem exactly? I just tried running your code, and it seems to execute fine at least. Is the output not what you expect? – LotB Dec 12 '19 at 17:43
  • When I try to run **CheckDuplications** function it works fine, but it uses only one from 12 logical processor cores, when I use **FindDuplications** I would like use all logical core or more then one for faster result. When I run multithread function **FindDuplications** script end after 3 seconds, but function **CheckDuplications** takes over 17 minutes, but I had over 100k rows – Jan Sršeň Dec 12 '19 at 18:05
  • 1
    ooh, I think I got your problem. the `pool.map()` calls the given function on every item independently. The `FindDuplications` doesn't receive the full list, and it can't have access to the rest of the list to find other duplicates. – LotB Dec 12 '19 at 18:22
  • btw, python convention uses snake_case for functions, it should be `find_duplications` – LotB Dec 12 '19 at 18:23
  • Ok snake_case will be ok, but do you have some idea how solve this problem or how to fix it, please ? – Jan Sršeň Dec 12 '19 at 18:53
  • I don't really like this, but you could declare the source list as a `global` variable, and compare from that. it would be fine as long as you limit the logic to read-only. – LotB Dec 12 '19 at 19:23
  • Hi, I declare ***dlist*** like global, but nothing was changed, is there another option then make dlist global ? – Jan Sršeň Dec 13 '19 at 06:47

1 Answers1

1

There were a few troubles in the code, you didn't parallel it, you can't just run one-thread code with a heavy task on multiple cores. The code requires some adopts.

Ok, anyway, here we are :)

from math import ceil
from multiprocessing import Pool, cpu_count, freeze_support


def get_red(val):
    return val


class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)


class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                     'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp


dlist = []
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))


def FindDuplications(task):
    dlist, start, count = task

    duplicates = []
    for r in dlist[start:start + count]:
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    return {d for d in duplicates if d.cnt > 1}


if __name__ == '__main__':
    freeze_support()

    threads = cpu_count()
    tasks_per_thread = ceil(len(dlist) / threads)

    tasks = [(dlist, tasks_per_thread * i, tasks_per_thread) for i in range(threads)]

    p = Pool(threads)
    duplicates = p.map(FindDuplications, tasks)
    p.close()
    p.join()

    duplicates = {item for sublist in duplicates for item in sublist}

    print(duplicates)
    print(type(duplicates))

It works well for me and returns the same results as one-thread function and works in all available cores in parallel.

Output

python test.py
{TTT_EEE;CCC_VVV;2
, TTT_XXX;CCC_VVV;2
}
<class 'set'>
Alexandr Shurigin
  • 3,921
  • 1
  • 13
  • 25
  • Hi Alexander, thanks now it works in parallel with all cores, but I need to return only **Duplication Class**, not **DsRef**. Into Duplication class I need to load all data, and after I select data where is Class attribute CNT larger then 1 for select only duplication. That mean, then each item form task list (DsRef class object) must check all rows in dlist. Is it possible to debug in parallel function with VS Code breakpoint? – Jan Sršeň Dec 16 '19 at 14:15
  • The source code works with your function, just modified for the multithread usage. I didn't change anything in it logic related `duplicates.append(Duplication(pn, comp, len(matches)))` – Alexandr Shurigin Dec 16 '19 at 14:17
  • Original (NON Multithread) function returns list of Duplication objects, but now it returns Duplicate object, where attribute comp had DsRef class, but Duplication class has only string attributes and CNT attribute for number of duplications – Jan Sršeň Dec 16 '19 at 14:28
  • woof, I didn't see that you changed the original function. Ok, updated to the original logic had. Please check now – Alexandr Shurigin Dec 16 '19 at 14:47
  • I do not change, anything, but it does not matter. I need to add 2 parameters into a function (first is tasks list per core and also full dlist ) Because task list must check all data in origin list – Jan Sršeň Dec 16 '19 at 15:13
  • Check FindDuplication function declaration and the first line. There is passed everything required. – Alexandr Shurigin Dec 16 '19 at 15:18