This question is based on example code that I found here: How can I have a process wait for multiple resources?
My problem is this: Two sources of data feed one process. The sources of data are modelled as SimPy Stores. At each tick, the consuming process can handle any source that is available (i.e. 0, 1 or 2 messages each tick). As an example, the generating process is as below:
def sender(env):
yield env.timeout(2)
yield buf_b.put(99)
print ("Put 99 into buf_b at time %.2f." % env.now)
yield buf_a.put(5)
print ("Put 5 into buf_a at time %.2f." % env.now)
yield env.timeout(2)
yield buf_b.put(7)
print ("Put 7 into buf_b at time %.2f." % env.now)
yield env.timeout(4)
yield buf_a.put(2)
print ("Put 2 into buf_a at time %.2f." % env.now)
yield buf_b.put(6)
print ("Put 6 into buf_b at time %.2f." % env.now)
The receiving process is as follows:
def receiver_1(env):
inchannel = [buf_a, buf_b]
while True:
# Make a new list of Get events in each iteration
print (" Getting at time %.2f." % env.now)
events = [ic.get() for ic in inchannel]
# Wait until (at least) one of them was triggered
res = yield env.any_of(events)
# Cancel all remaining requests, because you will make
# new ones in the next iteration.
# Do this *before* you yield anything
[evt.cancel() for evt in events]
# Handle all messages (there *might* be more than one)
for msg in res.values():
print (" Got msg value %s at time %.2f." % (msg, env.now))
The output of the script is:
Getting at time 0.00.
Put 99 into buf_b at time 2.00.
Put 5 into buf_a at time 2.00.
Got msg value 99 at time 2.00.
Getting at time 2.00.
Put 7 into buf_b at time 4.00.
Got msg value 7 at time 4.00.
Getting at time 4.00.
Put 2 into buf_a at time 8.00.
Put 6 into buf_b at time 8.00.
Got msg value 2 at time 8.00.
Getting at time 8.00.
The issue that I have is that on the first iteration (and similar on subsequent iterations) of the receiver process, only 1 of the events for the get() is triggered. In this example it corresponds to the put() into buf_b with the value of 99. Both of the get() are cancelled and the put() into buf_a with the value of 5 is never seen.
I know that I cannot rely on event ordering within SimPy so I can understand that the put() into buf_a and buf_b may not be triggered on the same iteration of the receiver, even though they were made on the same simulation tick. However, what I don't understand is why the put() for buf_a disappears. Shouldn't a get() on the subsequent iteration of the receiver loop return with the value added to buf_a?
Perhaps this is because the initial get() that was cancelled has since retrieved the value and that the Store is empty even though the receiver process never saw this as triggered?
EDIT: If I modify the receiver process with the following:
def receiver_1(env):
inchannel = [buf_a, buf_b]
while True:
# Make a new list of Get events in each iteration
print (" Getting at time %.2f." % env.now)
events = [ic.get() for ic in inchannel]
# Wait until (at least) one of them was triggered
res = yield env.any_of(events)
# Cancel all remaining requests, because you will make
# new ones in the next iteration.
# Do this *before* you yield anything
for evt in events:
if not evt.triggered:
evt.cancel()
else:
print(" Got msg value %s at time %.2f." % (evt.value, env.now))
Then the output is:
Getting at time 0.00.
Put 99 into buf_b at time 2.00.
Put 5 into buf_a at time 2.00.
Got msg value 5 at time 2.00.
Got msg value 99 at time 2.00.
Getting at time 2.00.
Put 7 into buf_b at time 4.00.
Got msg value 7 at time 4.00.
Getting at time 4.00.
Put 2 into buf_a at time 8.00.
Put 6 into buf_b at time 8.00.
Got msg value 2 at time 8.00.
Got msg value 6 at time 8.00.
Getting at time 8.00.
Which is what I expected. So it looks like between the:
res = yield env.any_of(events)
and
for evt in events:
if not evt.triggered:
evt.cancel()
else:
print(" Got msg value %s at time %.2f." % (evt.value, env.now))
The second event has triggered.
So my conclusion here is that the above code is not safe for the purposes of waiting for multiple events due to ordering within the simulator.
However, I'm not an expert here by any means so I could be wrong.