( The O/P MCVE-problem definition creeps further - yet the problem of coordination, be it prioritised or not, of {sensors|actors}-control-systems, the more the systems designed with a use of distributed autonomous agents is professionally very complex and easy to make defective "shortcuts" or get into a system-wide blocking-state
Best first read at least this about ZeroMQ Hierarchy in Less Than Five Seconds and this about mutual dead-lock blocking
Reading the fabulous Pieter HINTJENS' book "Code Connected: Volume 1" is of immense value for any system designer )
"...seams quite interesting as it implements the async already, so I could just add the async zmq as I did. am I wrong?"
Yes, there is no "just add async" shortcut, control-systems are very interesting discipline, but rather a complex one. Always. Sorry to have to hear that straight. Some complexities might get hidden from user in schoolbook examples or in trivial makers-projects. The hammer then comes right on trying steps to extend them, by just adding one or a few more trivial features. The complexity suddenly gets to the surface, not seen before.
The formal map of the O/P multi-agent-[A,B,C,D]
-system code (as-is)
Put the formal map on a full-screen editor so as to see the bigger picture of all mutually conflicting dependencies and competing loops-of-control. Latency is the easy part. Several places of risks of un-resolvable deadlock blocking is the core one. ZeroMQ, since v2.x had tooling for avoiding some of these, software designer has the responsibility to properly mitigate all the others. Control systems ( robotics or other ) have to proof such robustness and resilience to errors and safely "survive" also all "external" mishaps.
The best point to start with is the old golden rule as expressed in assembly language directive on row 1:
;ASSUME NOTHING
and fight to carefully design all the rest.
multi-agent-[A,B,C,D]-system coordination
| | | |
+-|-|-|--------------------- python while ~ 100 [ms] GIL-lock enforced quota for pure-[SERIAL]-ised code-execution, imposed on all python-threads ( be it voluntarily or involuntarily interruped by the python GIL-lock mechanics, O/S-specific )
+-|-|--------------------- hardware ~ 64 - 147 [ms] self.board proxy-driven, responding to python code
+-|--------------------- python asynchronous, strict sequence of remote/local events dependent ZeroMQ dFSA, distributed among local-code operated REP and remote-code operated REQ-side(s) - enforcing a mutually ordered sequence of distributed behaviour as REQ/REP Scalable Formal Communication Archetype Pattern defines
+--------------------- python asyncio.get_event_loop() instantiated another event-loop that may permit to defer an execution(s) of some parts of otherwise imperative python-code to some later time
multi-agent-[A,B,C,D]-system code (as-is)
: : : :
: : : +---------------------------------------------------------+
: : +-----------------------------------------------------------:-------------------+ - - - - - - - - - - - - - - - - -<?network?>- - - - - - - - - - - - - - +
: +-------------------------------------------------------------:----------+ : :
: : : : :
: : : : :
! : : : :
____PYTHON___! : : : :
! ? ? ? ?
+->! D? B? C?REP-1:{0:N}-remote---------------<?network?>------------------------REQ.C? dFSA-state?dependent
^ ! D? B? C?REP-1:{0:N} .C?
^ A!: IMPERATIVE LOOP-HEAD: while True: D?AWAIT B? C?REP-1:{0:N}-distributed-Finite-State-Automaton (dFSA) BEHAVIOUR, local .C? side depends also on EVOLUTION OF A FUZZY, DYNAMIC, MULTIPARTY, network-wide dFSA-STATE(s) inside such ECOSYSTEM
^ ! D? B? C?
^ ! D? B? C? REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
^ ! D? B? C? vC? ^C?
^ !_______.SET DEFERRED: P_D?C?_deferred_yield_ping =D?await ... C?REP.recv()---<--?---?--vC?-----<--<network>--------<--?remote-REQ-state-C?-( ^C?-dFSA-state && C?.recv()-blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then, just deferred via D?await )
^ ! D? B? vC? ^C?
^ !_______.SET DEFERRED: S_D?B?_deferred_yield_sonar =D?await ...B?.board.sonar_read()-o-<--?-+ vC? ^C?
^ ! : | vC? ^C?
^ !_______.GUI OUTPUT: print( deferred_yield_sonar ) #A!->-----------------------------+->----?->---:?--->[ a last-known (if any) S_D?B?_deferred_yield_sonar value put "now" on GUI-screen ]
^ ! : ^ vC? ^C?
^ !_______.SET TRANSFORMED: S_D?B?_dependent_tranformed =A!json.dumps( S_D?B? )--<--<--<--+ | vC? <--[ a last-known (if any) S_D?B?_deferred_yield_sonar value transformed and assigned]
^ ! : | vC? ^C?
^ !_______.BLOCKING-MODE-SEND() REP.send( S_D?B?_dependent_transformed.encode() ) #C? .send( S_D?B? )--?---->C?-->----<?network?>-->-------?remote-REQ-state-C?-( +C?-indeterministic and blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then )
^ !X:C? ^ vC? ^C?
^ !X:C?___.SET IMPERATIVE: i += 1 | REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
^ !X:C? ?
^ !X:C?___.NOP/SLEEP() DEFERRED: await sleep( ... ) #D?AWAIT ^ :
^ !X:C?D?+0ms | :
^ !X:C?D?_.JUMP/LOOP ? :
^__!X:C?D?+0ms ^ :
| :
| :
| :
____SONAR___________________________________________________________________________________________________________B? REQUEST T0: + EXPECT ~64 - ~147 [ms] LATENCY :
B? hardware value acquisition latency can be masked :
via await or other concurrency-trick ) :
:
____REQ-side(s)_?{0:N} __________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_____REQ-side(s)_?{0:N} _________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
______REQ-side(s)_?{0:N} ________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_______REQ-side(s)_?{0:N} _______________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
... ::: ...
______...REQ-side(s)_?{0:N} _____________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
As the O/P's EDIT : has explained 2 hours ago,
the problem is now evident. The infinite while True:
-loop instructs to hard-step through, line by line, and loop-"rotate" all the steps again, one after another, while any asyncio
await
-decorated functor(s) present there are being left, asynchronously independent of this "main" A: while True:
-loop block of imperative code execution. The same way a B: self.board
-device's external sonar-device is an independently timed device, external to the python code, having some unmanageable hardware/read/decode-latencies, the coordination of fixed-looping + C: ZeroMQ-REQ/REP
-Archetype-behaviour ( again externally-coordinated with a decentralised "foreign" REQ
-actor(s)/agent(s) - yes, you cannot know, how many of these are there ... - but all being outside of your scope of control and both all the REQ
-side(s) and your locally-instantiated REP
-side distributed-Finite-State-Machine states are being fully independent of the "framing"-python loop's will to drive steps forward and execute the next step, the next step, the next step ... ) + another, here D: asyncio.get_event_loop()
-instantiated "third"-event_loop, that influences how await
-decorated functors are actually permitted to defer to yield their results and deliver 'em a some later time ----- and, this is the problem of the "cross-bread"-event_loops.
If this problem setup has been elaborated from any Computer Science professor, she/he deserves standing ovations for making the task the best example of problems with distributed systems - almost may serve as a tribute to Mrs. Margaret HAMILTON's work on proper design of the Apollo AGC computer system, where her work has solved this class-of-problems and thus has saved the lives of crew and all the pride of the Moon landing, right 50-years ago. Great lecture, Mrs. Hamilton, great lecture.
Trivial, yet right on the spot.
Indeed a lovely and scientifically marvelous task:
Design a strategy for a robust, failure-resilient and coordinated work of a set of independently timed and operated agents [A, B, C, D],
A
being an imperative interpreted python language, principally having the GIL-lock prevented zero-concurrency, but a pure [SERIAL]
process-flow, C
being a fuzzy set of semi-persistent network-distributed REQ/REP
-agents, B
being an independently operated hardware device with some limited I/O interfacing to an A
-inspectable self.board
-proxy and all being mutually independent and physically distributed across a given ecosystem of software, hardware and network.
Hardware diagnostics + a proposed System Architecture approach have already been proposed yesterday. Without testing the self.board
-hosted sonar-device latencies no one can decide a next best step, as the realistic ( in-vivo benchmarked ) hardware response-times ( + best also the documentation down to the .board
and it's peripheral sensor device(s) MUX-ed or not? PRIO-driven or MUTEX-locked or static, non-shared peripheral device, register-read-only abstracted, ... ? ) are cardinal for deciding about the possible [A, B, C, D]
-coordination strategy.
The ZeroMQ part :
If you comment l.5
- REP_server_django.send(json_data.encode()) # l.5
you get into final block, as the original, strict form of the REQ/REP
ZeroMQ Scalable Formal Communication Archetype Pattern cannot .recv()
again, if it did not reply before that to the REQ
-side after the first .recv()
has been received with a .send()
.
This was a simple catch.
The rest is not a reproducible code.
You may want to:
- verify, if
self.board.sonar_read( trigger_pin )
receives any value and test a latency of doing that:
import numpy as np
from zmq import Stopwatch
aClk = Stopwatch()
def sonarBeep():
try:
a_value = -1
aClk.start()
a_value = self.board.sonar_read( trigger_pin )
a_time_us = aClk.stop()
except:
try:
aClk.stop()
finally:
a_time_us = -1
finally:
return( a_value, a_time_us )
and run a series of 100 sonar-tests, to get min, Avg, StDev, MAX readings about latency times all in [us]
as these values are cardinal to know, in case some control-loops are to be designed w.r.t the SONAR-sensor data.
[ aFun( [ sonarBeep()[1] for _ in range( 100 ) ]
) for aFun in ( np.min, np.mean, np.std, np.max )
]
The System Architecture and sub-systems coordination :
Last, but not least, one may let read and store sonar data, in an absolutely independent event loop, uncoordinated with any other operations and just read a state-variable from such a storage, being set in an independently working subsystem ( if not extremely saving power for doing that as independent system behaviour )
Whenever one tries to tightly coordinate a flow of independent events ( the worst in distributed systems with uncoordinated or weakly coordinated agents ) design has to grow in both robustness to errors and time mis-alignments and error-resilience. Otherwise the system may soon deadlock/livelock itself in a snap.
If in doubts, may learn from the original philosophy of the XEROX Palo Alto Research Centre MVC-separation, where the MODEL
-part can ( and in the GUI-frameworks most of the time, since 198x+ does ) receive many state-variables updated all independently of other system components, that just read/use actual state-variables' data if they need them and as they need them. Similarly, SONAR can, if power budget permits, continuously scan the scene and write readings into any local-registers and let other components come and ask or get served their request for the last actual SONAR reading.
So does the ZeroMQ zen-of-zero work.
If that may help, check the zmq.CONFLATE
mode of the local-side message-store working right this way.
A minor note: one might already have noted, that sleep( 1 / 1000 )
is quite an expensive, repetitively executed step and dangerous as it effectively does no sleep in py2.x, due to integer division.