0

This question is based on example code that I found here: How can I have a process wait for multiple resources?

My problem is this: Two sources of data feed one process. The sources of data are modelled as SimPy Stores. At each tick, the consuming process can handle any source that is available (i.e. 0, 1 or 2 messages each tick). As an example, the generating process is as below:

def sender(env):

    yield env.timeout(2)
    yield buf_b.put(99)
    print ("Put 99 into buf_b at time %.2f." % env.now)
    yield buf_a.put(5)
    print ("Put 5 into buf_a at time %.2f." % env.now)

    yield env.timeout(2)
    yield buf_b.put(7)
    print ("Put 7 into buf_b at time %.2f." % env.now)

    yield env.timeout(4)
    yield buf_a.put(2)
    print ("Put 2 into buf_a at time %.2f." % env.now)
    yield buf_b.put(6)
    print ("Put 6 into buf_b at time %.2f." % env.now)

The receiving process is as follows:

def receiver_1(env):
    inchannel = [buf_a, buf_b]

    while True:
        # Make a new list of Get events in each iteration
        print ("          Getting at time %.2f." % env.now)
        events = [ic.get() for ic in inchannel]

        # Wait until (at least) one of them was triggered
        res = yield env.any_of(events)

        # Cancel all remaining requests, because you will make
        # new ones in the next iteration.
        # Do this *before* you yield anything
        [evt.cancel() for evt in events]

        # Handle all messages (there *might* be more than one)
        for msg in res.values():
            print ("          Got msg value %s at time %.2f." % (msg, env.now))

The output of the script is:

          Getting at time 0.00.
Put 99 into buf_b at time 2.00.
Put 5 into buf_a at time 2.00.
          Got msg value 99 at time 2.00.
          Getting at time 2.00.
Put 7 into buf_b at time 4.00.
          Got msg value 7 at time 4.00.
          Getting at time 4.00.
Put 2 into buf_a at time 8.00.
Put 6 into buf_b at time 8.00.
          Got msg value 2 at time 8.00.
          Getting at time 8.00.

The issue that I have is that on the first iteration (and similar on subsequent iterations) of the receiver process, only 1 of the events for the get() is triggered. In this example it corresponds to the put() into buf_b with the value of 99. Both of the get() are cancelled and the put() into buf_a with the value of 5 is never seen.

I know that I cannot rely on event ordering within SimPy so I can understand that the put() into buf_a and buf_b may not be triggered on the same iteration of the receiver, even though they were made on the same simulation tick. However, what I don't understand is why the put() for buf_a disappears. Shouldn't a get() on the subsequent iteration of the receiver loop return with the value added to buf_a?

Perhaps this is because the initial get() that was cancelled has since retrieved the value and that the Store is empty even though the receiver process never saw this as triggered?

EDIT: If I modify the receiver process with the following:

def receiver_1(env):
    inchannel = [buf_a, buf_b]

    while True:
        # Make a new list of Get events in each iteration
        print ("          Getting at time %.2f." % env.now)
        events = [ic.get() for ic in inchannel]

        # Wait until (at least) one of them was triggered
        res = yield env.any_of(events)

        # Cancel all remaining requests, because you will make
        # new ones in the next iteration.
        # Do this *before* you yield anything
        for evt in events:
            if not evt.triggered:
                evt.cancel()
            else:
                print("          Got msg value %s at time %.2f." % (evt.value, env.now))

Then the output is:

          Getting at time 0.00.
Put 99 into buf_b at time 2.00.
Put 5 into buf_a at time 2.00.
          Got msg value 5 at time 2.00.
          Got msg value 99 at time 2.00.
          Getting at time 2.00.
Put 7 into buf_b at time 4.00.
          Got msg value 7 at time 4.00.
          Getting at time 4.00.
Put 2 into buf_a at time 8.00.
Put 6 into buf_b at time 8.00.
          Got msg value 2 at time 8.00.
          Got msg value 6 at time 8.00.
          Getting at time 8.00.

Which is what I expected. So it looks like between the:

res = yield env.any_of(events)

and

        for evt in events:
            if not evt.triggered:
                evt.cancel()
            else:
                print("          Got msg value %s at time %.2f." % (evt.value, env.now))

The second event has triggered.

So my conclusion here is that the above code is not safe for the purposes of waiting for multiple events due to ordering within the simulator.

However, I'm not an expert here by any means so I could be wrong.

  • in sender, what happens if you drop the yields in front of the puts? I am wondering if the any_of is firing before the second sender put despite what the print log is showing. – Michael Aug 05 '22 at 21:55
  • If I remove the yields then I get both puts() coming through. I don't understand though why that makes a difference? If this comes down to event ordering then it still seems to me that this code is not safe. – Andrew Lees Aug 05 '22 at 22:05
  • every time you do a yield, simpy checks to see if any other events are ready to fire. When the first sender put finishes, the receiver's any_of is ready to fire. When the senders gets to the second put and sees the yield, it pauses and lets the receiver handle the anyOf event before it does the sender's second put. In simpy every time you yield, that process becomes idle and gives other processes a chance to process events. If a process ran from start to end then the infinite loops would never stop and no other process would have a chance to run. – Michael Aug 05 '22 at 22:19
  • actually the more I think about it. The receiver any of might get processed after the sender's first put, but before retuning from the first yield – Michael Aug 05 '22 at 22:27
  • OK, thanks. That explains then why removing the yields makes a difference here. However, what if the sender must use a yield to avoid data loss (i.e. if in a specific application the receiver cannot immediately perform a get for some reason)? Is there a way of forcing the receiver process to pend execution until all of the events generated by the sender have been executed on any given tick? – Andrew Lees Aug 05 '22 at 22:43
  • Or what if the puts were in different processes and that after the Store put() there is a yield timeout? That would mean that the receiver would behave differently depending on whether the second put process is run ahead of it or not. – Andrew Lees Aug 05 '22 at 22:44
  • In my specific application (not shown here) I have 1 process that waits on data available in 2 Stores (process A). Each Store is updated by its own process (process B & C). When data is available in the Stores, it sends a message to a further process (process D). The contents of the message sent to process D is a function of both of the original Stores. So therefore the message sent by process A would be different depending on the relative scheduling of processes A, B, C and D. – Andrew Lees Aug 05 '22 at 22:45
  • not sure I get it all but if you need a object from both queues why not use a all_of instead of a any_of? – Michael Aug 05 '22 at 23:37
  • Another option is to use to use a store as a semi fore where each time a process adds to a queue it would check if the other queues were ready, if they are ready then that process would add a token to a semi fore queue. your receiver would first wait for the semi fore to get its token, then it would do gets on all the queues – Michael Aug 05 '22 at 23:42
  • Thanks Michael. Some useful tips there. I will give it a try. – Andrew Lees Aug 06 '22 at 09:31

0 Answers0