0

I am learning how to work with Python's multiprocessing, and the following are virtually identical block of codes, one is written in single threaded, single process, and the other is multiprocessing, the speed of processing for multi-processing is so much worse than single one,

both working on the same 38910 records of gettext PO file, with english and Vietnamese translations

  • single time:
# Execution took: 4.229461978015024 second(s)

and

  • multi-processing took:
# Execution took: 35.94734842295293 second(s).

nearly 9 times worse off

here is the code:

#!/usr/bin/env python3

import re

from matcher import MatcherRecord
import os
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

from babel.messages import Message
from sphinx_intl import catalog as c
from collections import OrderedDict
from translation_finder import TranslationFinder
from pattern_utils import PatternUtils as pu
from definition import Definitions as df
import time


class PatternFoundResult:
    def __init__(self):
        self.found_dict: OrderedDict = None

    def updateFoundResult(self, found_dict: OrderedDict):
        self.found_dict = found_dict
        is_found = len(found_dict) > 0
        if not is_found:
            return

        print(found_dict)


def task001(arg):
    def genericAbbrevFormat(orig_txt: str, trans_txt: str, is_reverse=False):
        def replaceAbbrev(entry):
            loc = entry[0]
            mm: MatcherRecord = entry[1]
            sub_list = mm.getSubEntriesAsList()
            (abbrev_loc, abbrev_txt) = sub_list[1]
            changing_txt = replaceAbbrev.txt

            try:
                (exp_loc, exp_txt) = sub_list[3]
            except Exception as e:
                (exp_loc, exp_txt) = sub_list[2]

            abbrev_txt = f'[{abbrev_txt} - {exp_txt}]'
            changed_txt = pu.jointText(changing_txt, abbrev_txt, loc)
            replaceAbbrev.txt = changed_txt
            return True

        abbrev_dict = pu.patternMatchAll(df.ABBR_WITH_PRE_POST_QUOTES, trans_txt, is_reversed=True)
        has_abbrev_embedded = (len(abbrev_dict) > 0)
        if has_abbrev_embedded:
            replaceAbbrev.txt = trans_txt
            list(filter(replaceAbbrev, abbrev_dict.items()))
            trans_txt = replaceAbbrev.txt

        orig_txt = df.squareBracket(orig_txt)

        is_tran = (trans_txt is not None)
        if not is_tran:
            format_string = f':abbr:`{orig_txt} ()`'
            return format_string

        trans_txt = df.squareBracket(trans_txt)
        if is_reverse:
            format_string = f':abbr:`{orig_txt} ({trans_txt})`'
        else:
            format_string = f':abbr:`{trans_txt} ({orig_txt})`'
        return format_string

    def isGlossary(m: Message):
        check_string = 'manual/glossary/index'
        locations = m.locations
        is_glossary = False

        for loc in locations:
            is_glossary = (check_string in locations)
            if is_glossary:
                break
        return is_glossary

    def formatFoundEntry(entry):
        mm: MatcherRecord = None
        (loc, mm) = entry

        en_txt = mm.getComponent(2, 1)
        vn_txt = tf.isInDict(en_txt)

        has_tran = (vn_txt is not None)
        if has_tran:
            in_catalog = tf.isEnGoesFirst(en_txt)
            is_en_coming_first = (in_catalog or is_glossary)
            abbrev_txt = genericAbbrevFormat(en_txt, vn_txt, is_reverse=is_en_coming_first)
            front_filler = mm.getComponent(1, 1)
            back_filler = mm.getComponent(3, 1)
            ast_txt = f'{front_filler}{abbrev_txt}{back_filler}'
            return ast_txt
        else:
            return en_txt

    pat: re.Pattern = None
    m: Message = None
    tf: TranslationFinder = None

    (index, m, tf, pat, is_simple) = arg
    en_txt = m.id

    is_glossary = isGlossary(m)
    is_repeat = tf.isRepeat(en_txt)

    found_dict = pu.patternMatchAll(pat, en_txt)
    is_found = len(found_dict) > 0
    if is_found:
        result_string_list = list(map(formatFoundEntry, found_dict.items()))
    else:
        result_string_list = []
    return result_string_list


if __name__ == "__main__":
    time_start = time.perf_counter()

    is_debug = False
    home_dev = os.environ['DEV']
    input_path = os.path.join(home_dev, "current_blender_manual_merge_flat_0001.po")
    input_cat = c.load_po(input_path)

    BaseManager.register('TranslationFinder', TranslationFinder)
    manager = BaseManager()
    manager.start()

    tf = manager.TranslationFinder()
    pat:re.Pattern = df.QUOTEDTEXT_UNTRANSLATED_PATTERN

    is_simple = True
    result_handler = PatternFoundResult()
    with Pool() as pool:
        m: Message = None
        for (index, m) in enumerate(input_cat):
            arg = (index, m, tf, pat, is_simple)
            pool.apply_async(task001, args=[arg], callback=result_handler.updateFoundResult)
        pool.close()
        pool.join()

    print(f'Execution took: {time.perf_counter() - time_start} second(s).')

and here is the single threaded, the main section:

if __name__ == "__main__":

    time_start = time.perf_counter()

    is_debug = False
    home_dev = os.environ['DEV']
    input_path = os.path.join(home_dev, "current_blender_manual_merge_flat_0001.po")
    input_cat = c.load_po(input_path)


    result_handler = PatternFoundResult()

    tf = TranslationFinder()
    pat:re.Pattern = df.QUOTEDTEXT_UNTRANSLATED_PATTERN
    is_simple = True
    for (index, m) in enumerate(input_cat):
        arg = (index, m, tf, pat, is_simple)
        result_string_list = task001(arg)
        result_handler.updateFoundResult(result_string_list)
    print(f'Execution took: {time.perf_counter() - time_start} second(s) - records: {len(input_cat)}')

Could you please tell me how so, where did I go wrong?

  • In order to know what the issue is, you will have to [profile](https://stackoverflow.com/questions/582336/how-do-i-profile-a-python-script) both scripts. – Random Davis May 05 '23 at 16:04
  • 2
    You pass lots of potentially long strings from the parent process to subprocess. This operation is costly, requires cross process allocations and copies. It is possible that it dominates the execution time. – freakish May 05 '23 at 16:08
  • despite being a not so long code, it features severalimports for other files which make imossible for any other person to try your code and help with any bottlenecks. Instead, we cn just stare at the code. Does `tf` have to be aregistred with a Manager for example? From looking at the code, ityou get all your data back with the `return` - managers are complex thigns and potential sources of slowdown (can't know without profiling, though). – jsbueno May 05 '23 at 16:37
  • 1
    All that said, what really looks is that the processing you perform in a single message takes much less resources than serializing all the arguments at each function call. You could try to batch "m" in blocks, say, send at least 100 of them for a subprocess at each call and check if these timings change somehow. – jsbueno May 05 '23 at 16:38
  • I do have a gut feeling about the sharing tf (translation finder) in the manager. I do not know the underlining implementation of this module but if it lock and unlock the access of each call then this is indeed the bottleneck of the program. I don't know if that is true and if it is then how should you resolve that problem. – Hoang Duy Tran May 05 '23 at 17:20
  • I am sorry but there are too many modules in the potranslate I could not share all the source code. But basically, potranslate works with underlying dictionaries which holds many PO translation files, one for translations, one for the english goes first records, one for repeat lines (title and subtitle lines, where English and Vietnamese will be quoted, ie. repeat the meaning) etc.. also a dictionary for keyboard, in memory and not in file. so the tf (translation finder) can be quite memory heavy, – Hoang Duy Tran May 05 '23 at 17:30
  • but once load, it works fairly quickly, acceptable for script base language like Python. I just don't know how to streamline the sharing for processes, I mean I don't know how yet. – Hoang Duy Tran May 05 '23 at 17:30
  • Ok, I have made a zip file of all files that is needed to test this problem. You can access the zip file and download it from here (https://drive.google.com/file/d/1zARtXAQJJUTbC7PpeOWTCnE9N5qy7ya0/view?usp=sharing). Please put them into your $HOME/bin directory and unpack it there. It should be under $HOME/bin/local. Thank you. – Hoang Duy Tran May 05 '23 at 18:11
  • Oh, sorry, you also needed to download this 'requirements.txt' and place it anywhere, and runs the ```python -m pip install -r requirements.txt``` to install 'Sphinx' stuff (https://www.sphinx-doc.org/en/master/). This link please: (https://drive.google.com/file/d/1x2hYRexanVZy1FvUNBDUsmTWw6g0MKtp/view?usp=share_link) – Hoang Duy Tran May 05 '23 at 18:26
  • I have listen to your advice and ran the profiler, here is the link to the profiler file (https://drive.google.com/file/d/1lIMOTTkaZBkfnGRzU8RoFFxqi-F_3a2E/view?usp=sharing). This profile is only on the testFindMulti.py (Multiprocessing code) – Hoang Duy Tran May 05 '23 at 18:46
  • I think my instincts is correct, running the cProfile again within the code, with pstats sorting by TIME, showed this: `` Execution took: 41.18419484200422 second(s). 8054139 function calls (8053751 primitive calls) in 41.036 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 15 32.815 2.188 32.815 2.188 {method 'acquire' of '_thread.lock' objects} `` meaning the acquire, _thread.lock on objects – Hoang Duy Tran May 05 '23 at 19:20
  • This is the link to the profile file I've done with TIME sorted: (https://drive.google.com/file/d/1RWbEbSh14M4mCJj_h0YRDqTwrEVBo7Zj/view?usp=sharing) – Hoang Duy Tran May 05 '23 at 19:42
  • I have listened to your advice on batching and it works, reduced the execution time down to```Execution took: 5.327071494999927 second(s). 38910 records processed.```. Still slower than single process, though much better. – Hoang Duy Tran May 06 '23 at 08:08

0 Answers0