0

I'm trying to write a generic class that will handle multithreaded functions. It works great when I only have one argument to pass to the function, but no matter how I modify it, I cannot make it work with two arguments.

One of my issues is that I have no control over the functions I'm calling, so I won't know the paramter identifiers in the functions, and I can't modify the functions to change the order of the parameters or so that the functions deal with getting a single iterable instead of handling separate parameters.

I've searched several pages and scratched my head for a while but it just isn't making sense to me. Some of the sites I've read are:

Python multiprocessing pool.map for multiple arguments

http://python.omics.wiki/multiprocessing_map/multiprocessing_partial_function_multiple_arguments

https://www.reddit.com/r/learnpython/comments/8f2p4d/call_a_function_with_multiple_arguments_with/

I've trimmed my code down to a bare example shown below. Generating boxes isn't a great example to use for multithreading, but it will do for the example. The output should be some boxes of different sizes made with the @ character.

I need to step away from my code for a bit and clear my head, so I thought I'd ask for some help here while I'm away from the keyboard.

from multiprocessing import Pool

class Parallel():
    """
    Allows one function to be executed in several threads at the same time, each with it's own parameters

    Attributes:
        function: func
            Function to be executed in parallel
        parameter_list: list
            A list of values to be processed by the function
        thread_limit: int
            Limits the number of threads that can run at one time
        wait: bool
            Will wait for all the functions to complete if True
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """

        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        # self._x = self._pool.map_async(function, parameter_list, chunksize=1)  # can't multiply sequence by non-int of type 'tuple'

        self._x = self._pool.starmap_async(function, parameter_list, chunksize=1)  # box() takes from 1 to 2 positional arguments but 19 were given

        # self._x = self._pool.apply_async(function, parameter_list)  # can't multiply sequence by non-int of type 'tuple'

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"

    def get(self):
        """
        Read the data from the functions that executed. Will block if threads are still active.

        Returns:
            list: A list of results
        """

        self._x.wait()
        self._state = "done"

        return self._x.get()


def box(size: int, fill: str = "#"):
    """ 
    Returns a square of the specified size, consisting of the fill character
    """
    print("FILL:{}".format(fill))  # Debugging
    print("SIZE:{}".format(size))  #
    line = fill * size + "\n"
    a_box = line * size

    return a_box

if __name__ == '__main__':

    # Generate boxes from 2 to 20 characters in size, made of the "@" character
    box_sizes = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    fill_char = ("@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@")

    p = Parallel(function=box, parameter_list=(box_sizes, fill_char), wait=True)  # How to write this line???

    results = p.get()

    for box in results:
        print(box)
        print("")
Calab
  • 351
  • 4
  • 20

1 Answers1

0

After reviewing this short description about starmap I think I understand where my issue was. I was trying to pass my multiple parameters as two lists instead of a list for each parameter pair.

The solution I came up with works. I created a second class for handling multiple parameters. This is the code, including a few tests to ensure that it worked.

from multiprocessing import Pool

class Parallel():
    """
    Allows one function to be executed in several threads at the same time, each with it's own single parameter
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """
        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.map_async(function, parameter_list,chunksize=1)

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"

    def get(self):
        """
        Read the data from the functions that executed. Will block if threads are still active.

        Returns:
            list: A list of results
        """
        self._x.wait()
        self._state = "done"

        return self._x.get()


class Parallel2(Parallel):
    """
    Allows one function to be executed in several threads at the same time, each with multiple parameters
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel2 class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of multiple values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """
        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.starmap_async(function, parameter_list, chunksize=1)

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"


def box(size: int, fill: str = "#"):
    """ 
    Returns a square of the specified size, consisting of the fill character

    :param size: Size of the box
    :param fill: Character used to make the box

    :return: The box in a string
    """
    line = fill * size + "\n"
    a_box = line * size

    return a_box


def just_box(size: int):
    """
    Return a square of the specified size, consisting of the "#" character

    :param size: Size of the box

    :return: The box in a string
    """
    return box(size, fill="#")


def repeat_names(infos):
    """
    Repeats a name a specified number of times
    :param infos: A list containing a name, and a number to repeat the name by

    :return: The name, repeated
    """
    name_str = ""
    for x in range(infos[1]):
        name_str += infos[0]

    return name_str


if __name__ == '__main__':

    # Generate boxes of multiple sizes, made of the "@" character
    box_sizes = (2, 5, 6, 9, 10, 13, 15, 16, 19, 20)
    fill_char = "@"

    param = []
    for box_size in box_sizes:
        param.append((box_size, fill_char))

    # Test passing multiple arguments to a function
    p1 = Parallel2(function=box, parameter_list=param, wait=True, thread_limit=15)

    results = p1.get()

    for box in results:
        print(box)
        print("")


    # Test passing a single argument to a function
    p2 = Parallel(function=just_box, parameter_list=box_sizes, wait=True, thread_limit=15)

    results = p2.get()

    for box in results:
        print(box)
        print("")


    # Test passing a tuple as a single argument to a function
    name_list = [("Bob", 3), ("Mary", 5), ("John", 2)]
    p3 = Parallel(function=repeat_names, parameter_list=name_list, wait=True)

    results = p3.get()
    for result in results:
        print(result)
    print("")


    # Test passing a list as a single argument to a function
    name_list = [["Bob", 3], ["Mary", 5], ["John", 2]]
    p3 = Parallel(function=repeat_names, parameter_list=name_list, wait=True)

    results = p3.get()
    for result in results:
        print(result)
Calab
  • 351
  • 4
  • 20