1

I'm trying to simulate different tasks running in parallel in Python, and each parallel process is run at different frequencies (e.g. 200 Hz, 100 Hz and 50 Hz). I used code from this question to make a Timer class to run these processes in "Real-Time", but the processes de-synchronize over time (e.i., three 200 Hz tasks can sometimes run in between two 100 Hz tasks).

To synchronize my processes I make tick counters in their shared memory. Every iteration of the 200 Hz process increments a counter, then waits for it to be reset to 0 when the counter reaches 2, while every iteration of 100 Hz process waits for that counter to reach 2 before resetting it. Same thing for the 50 Hz process, but with another counter. I use a while/pass method for the waiting.

Here is the code :

from multiprocessing import Process, Event, Value
import time

# Add Timer class for multiprocessing
class Timer(Process):
    def __init__(self, interval, iteration, function, args=[], kwargs={}):
        super(Timer, self).__init__()
        self.interval = interval
        self.iteration = iteration
        self.iterationLeft = self.iteration
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        startTimeProcess = time.perf_counter()
        while self.iterationLeft > 0:
            startTimePeriod = time.perf_counter()
            self.function(*self.args, **self.kwargs)
            # print(self.interval-(time.clock() - startTimePeriod))
            self.finished.wait(self.interval-(time.perf_counter() - startTimePeriod))
            self.iterationLeft -= 1
        print(f'Process finished in {round(time.perf_counter()-startTimeProcess, 5)} seconds')


def func0(id, freq, tick_p1):
    # Wait for 2 runs of Process 1 (tick_p1)
    while tick_p1.value < 2:
        pass
    tick_p1.value = 0   # Reset tick_p1

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)



def func1(id, freq, tick_p1, tick_p2):
    # Wait for tick_p1 to have been reset by Process0
    while tick_p1.value >= 2:
        pass
    # Wait for 2 runs of Process 2 (tick_p2)
    while tick_p2.value < 2:
        pass
    tick_p2.value = 0   # Reset tick_p2

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)

    # Increment tick_p1
    tick_p1.value += 1


def func2(id, freq, tick_p2):
    # Wait for tick_p2 to have been reset by Process1
    while tick_p2.value >= 2:
        pass

    # Add fake computational time depending on the frequency of the process
    print(f'id: {id} at {freq} Hz') 
    if freq == 400:
        time.sleep(0.002)
    elif freq == 200:
        time.sleep(0.003)
    elif freq == 100:
        time.sleep(0.007)
    elif freq == 50:
        time.sleep(0.015)

    # Increment tick_p2
    tick_p2.value += 1



if __name__ == '__main__':
    freqs = [50,100,200]
    # freqs = [0.25,0.5,1]
    Tf = 10

    tick_p1 = Value('i', 1)
    tick_p2 = Value('i', 1)  

    processes = []
    p0 = Timer(interval=1/freqs[0], iteration=round(Tf*freqs[0]), function = func0, args=(0,freqs[0], tick_p1))
    p1 = Timer(interval=1/freqs[1], iteration=round(Tf*freqs[1]), function = func1, args=(1,freqs[1], tick_p1, tick_p2))
    p2 = Timer(interval=1/freqs[2], iteration=round(Tf*freqs[2]), function = func2, args=(2,freqs[2], tick_p2))
    processes.append(p0)
    processes.append(p1)
    processes.append(p2)

    start = time.perf_counter()
    for process in processes:
        process.start()   

    for process in processes:
        process.join()

    finish = time.perf_counter()

    print(f'Finished in {round(finish-start, 5)} seconds')

As you can see, I've added sleep time within the processes, to simulate computational time. When I remove the print commands in the processes, the script requires 10.2 seconds of runtime to simulate 10 seconds of "real-time" calculations (2% increase, which is acceptable).

My question is, is this the best way to achieve what I'm trying to do? Is there a better/faster way?

Thanks!

bobzwik
  • 53
  • 3

1 Answers1

0

I figured out a cleaner way to do this, but I'm still open to other suggestions.

Basically, instead of waiting for the moment to execute the next iteration, I've created a scheduler that flags when to run a cycle on each process (using shared Values).

There is a fast processes (lets say p2 running at 400 Hz) and all other processes must be a slower multiple of the frequency (lets say p1 and p0 200 and 100 Hz).

Instead of waiting for the right moment to raise execution flag (with wait() or sleep()), the scheduler loops with a while loop and checks if a period of p2 has ended. If the condition is met, it raised p2Flag and restart the period. Each process has its own flag, and the slower processes' flags get raised according to a counter which is incremented every period of p2. If 2 timesteps of p2 have run since the last time p1 was called, this scheduler will "wait" for p1 to be completed before raising p2's and p1's flag.

It's a bit complicated, but this ensures slower machines will obtain the same results as machines who can run this in "real-time".

from multiprocessing import Process, Value
import time

def func0(id, freq, endFlag, p0Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p0Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Lower flag to confirm completion of cycle
            p0Flag.value = 0


def func1(id, freq, endFlag, p1Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p1Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Lower flag to confirm completion of cycle
            p1Flag.value = 0


def func2(id, freq, endFlag, p2Flag, runIdx, Ts):
    while (endFlag.value == 0):
        if (p2Flag.value == 1):
            t = round(runIdx.value*Ts, 4)

            # Add fake computational time depending on the frequency of the process
            # print(f'id: {id} at {freq} Hz at {t}s') 
            if freq == 500:
                time.sleep(0.0015)
            elif freq == 400:
                time.sleep(0.002)
            elif freq == 200:
                time.sleep(0.003)
            elif freq == 100:
                time.sleep(0.007)
            elif freq == 50:
                time.sleep(0.015)

            # Update time for next iteration
            runIdx.value += 1
            # Lower flag to confirm completion of cycle
            p2Flag.value = 0



if __name__ == '__main__':
    # Set frequencies of processes
    # Last value of freqs is the fastest one, for process p2
    freqs = [50,100,200]    # Hz
    freqs = [100,200,400]   # Hz
    # freqs = [0.25,0.5,1]  # Hz
    Tf = 10
    Ts = round(1/freqs[-1], 4)

    # Create shared values for time index (runIdx)
    # Various flags to trigger the execution of the code in each process (p0Flag, ...)
    # A flag to en all processes
    runIdx = Value('I',0)
    p0Flag = Value('b', 0)
    p1Flag = Value('b', 0)
    p2Flag = Value('b', 0)
    endFlag = Value('b', 0)

    # How many times the fastest process has to run before flagging the slower processes
    p0_counter_exe = freqs[-1]/freqs[0]
    p1_counter_exe = freqs[-1]/freqs[1]

    if (not(freqs[-1] % freqs[0] == 0) or not(freqs[-1] % freqs[1] == 0)):
        raise Exception("Update rates for processes must be a multiple of the dynamic's update rate.")
    if (freqs[-1] < freqs[0]) or (freqs[-1] < freqs[1]):
        raise Exception("Dynamics update rate must be the fastest.")

    # p2 is at fastest frequency, p1 and p0 at lower frequencies
    p0 = Process(target=func0, args=(0, freqs[0], endFlag, p0Flag, runIdx, Ts))
    p1 = Process(target=func1, args=(1, freqs[1], endFlag, p1Flag, runIdx, Ts))
    p2 = Process(target=func2, args=(2, freqs[2], endFlag, p2Flag, runIdx, Ts))
    processes = []
    processes.append(p0)
    processes.append(p1)
    processes.append(p2)

    for process in processes:
        process.start()   
    time.sleep(0.5)

    # Start subprocesse's counters to execute directly at the first timestep
    p0_counter = p0_counter_exe
    p1_counter = p1_counter_exe

    # Scheduler
    #------------
    startTime  = time.perf_counter()
    periodEnd = time.perf_counter()
    while (runIdx.value*Ts < Tf):
        periodTime = time.perf_counter()-periodEnd
        do_p2 = False

        # Wait for new timestep AND completion of p2
        if (periodTime >= Ts and p2Flag.value == 0):

            # If p0 or p1 are expected to finish before the new timestep, wait for their completion
            # Depending on the situation, if slower processes have finished their cycle, make do_p2 True
            if (p1_counter == p1_counter_exe) and (p0_counter == p0_counter_exe):
                if (p1Flag.value == 0) and (p0Flag.value == 0):
                    do_p2 = True
            elif (p1_counter == p1_counter_exe):
                if (p1Flag.value == 0):
                    do_p2 = True
            elif (p0_counter == p0_counter_exe):
                if (p0Flag.value == 0):
                    do_p2 = True
            else:
                do_p2 = 1

            # If do_p2 is True, raise p2Flag for the p2 process
            if (do_p2):
                periodEnd = time.perf_counter()
                p2Flag.value = 1

                # If it's time to start a cycle for the slower processes, raise their flag and reset their counter
                if (p1_counter == p1_counter_exe):
                    p1Flag.value = 1
                    p1_counter = 0 
                if (p0_counter == p0_counter_exe):
                    p0Flag.value = 1
                    p0_counter = 0

                # Increment slower processes counter
                p1_counter += 1
                p0_counter += 1


    # Close all processes
    endFlag.value = 1

    for process in processes:
        process.join()

    print(f'Finished in {round(time.perf_counter()-startTime, 5)} seconds')
    print(Ts*runIdx.value)
bobzwik
  • 53
  • 3