Until resolved and re-confirmed by chapel team, kindly test any chapel ZMQ module services just with int
payloads and possibly avoid PUB/SUB
archetypes ( due to string-matching pending issue ).
As @Nick has recently disclosed here, there is a way yet to go to make ZMQ
services to meet the ZeroMQ API compliance and to fully open a cross-compatible gate to heterogenous distributed-systems:
To send a string, Chapel sends one message with the string size followed by another message with the byte buffer; receiving works similarly.
That means that your one call to <aSocket>.recv( string )
was actually making two back-to-back calls to zmq_recv()
under the hood. With the REQ/REP
pattern, those two back-to-back zmq_recv()
calls put the ZeroMQ state machine into an invalid state, hence the error message.
This is definitely a bug with Chapel's ZMQ
module.
A few steps to shed more light on the scene:
Let me propose a few measures to take, before diagnosing the root-cause. ZeroMQ is a quite powerful framework, where one could hardly pick a harder ( and more fragile ) messaging archetype, than the REQ/REP
.
The internal Finite-State-Automata ( in fact, distributed-FSA ) are both blocking ( by-design, to enforce a pendulum-like message passing among the connected peers ( that need not be just the first 2 ) so that a SEQ of [A]-.send()
-.recv()
-.send()
-.recv()
-... on one side [A] matches the SEQ of [B]-.recv()
-.send()
-.recv()
-... ) and this dFSA also have a principally un-salvageable mutual deadlock, if for any reason both sides enter into a wait-state, where both [A] and [B] expect to receive a next message from the opposite side of the channel.
This said, my advice would be to first move into a simplest possible test - using a pair of unrestricted, simplex channels ( be it [A]PUSH
/[B]PULL
+ [B]PUSH
/[A]PULL
, or a bit more complicated scheme with PUB/SUB
).
Not going into a setup for a fully meshed, multi-Agent infrastructure, but a simplified version of this ( without a need for and intention to use the ROUTER/DEALER
channels, but perhaps duplicating ( reversed ) PUSH/PULL
-s if extending the mock-up scheme ):

More efforts will yet to be spent on implied limitations, arising from current chapel implementation constraints:
In Chapel, sending or receiving messages on a Socket
uses multipart messages and the Reflection
module to serialize primitive and user-defined data types whenever possible. Currently, the ZMQ
module serializes primitive numeric types, strings, and records composed of these types. Strings are encoded as a length (as int) followed by the character array (in bytes).
This makes some more issues on both sides and some tweaking ought be expected, if these remarks are not just wire-level internalities and extend to the top-level ZeroMQ messaging/signalling-layer ( ref. details for managing subscriptions, where ZeroMQ topic-filter matching is based on a left-side exact-match against the message received, et al ).
The python side enjoys a much wider freedom of design:
#
# python
# #########
import time
import zmq; context = zmq.Context()
print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )
dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )
heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 ) # ( ignore history, keep just last )
heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" ) # in case [Chapel] complicates serialisation
# -------------------------------------------------------------------
while ( True ):
pass; print( "INF: waiting for a [Chapel] HeartBeat-Message" )
hbIN = heartB.recv( zmq.NOBLOCK );
if len( hbIN ) > 0:
pass; print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
break
else:
time.sleep( 0.5 )
# -------------------------------------------------------------------
for request in range(10):
pass; print( "INF: Sending a request %s to [Chapel] ..." % request )
dataAB.send( str( "Yo" ) )
pass; print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
message = dataAB.recv()
pass; print( "INF: [Chapel] said %s" % message )
# -------------------------------------------------------------------
dataAB.close()
heartB.close()
context.term()
# -------------------------------------------------------------------
Some further try:/except:/finally:
constructs ought be put in service for KeyboardInterrupt
-s from infinite while()
-loops et al, but for clarity, these were omitted here.
On the chapel side we will do our best to keep pace with the API, as-is:
Documentation, as-is, does not help yet to decide, whether user-code has an option to control, if a call to .send()
/ .recv()
method is implicitly always blocking or not, while your code assumes it is being run in a blocking-mode ( which I always and principally strongly discourage for any distributed-system design, blocking is a poor practice - more on this here ).
While the C-level call zmq_send()
may be a blocking call (depending on the socket type and flag arguments), it is desirable that a semantically-blocking call to Socket.send()
allow other Chapel tasks to be scheduled on the OS thread as supported by the tasking layer. Internally, the ZMQ module uses non-blocking calls to zmq_send()
and zmq_recv()
to transfer data, and yields to the tasking layer via chpl_task_yield() when the call would otherwise block.
Source
use ZMQ;
use Reflection;
var context: Context;
var dataBA = context.socket( ZMQ.REP ),
heartB = context.socket( ZMQ.PUB );
var WAITms = 0; // setup as explicit int
dataBA.setsockopt( ZMQ.LINGER, WAITms );// a must
heartB.setsockopt( ZMQ.LINGER, WAITms );// a preventive step
dataBA.bind( "tcp://*:5555" ); // may reverse .bind()/.connect()
writeln( "INF: This Agent uses ZeroMQ v.", ZMQ.version() );
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
config var MAX_LOOPS = 120; // --MAX_LOOPS = 10 set on cmdline
var i = 0;
while ( i < MAX_LOOPS ) {
// --------------------------------------- // .send HeartBeat
heartB.send( "[chapel2python.HB]" );
i += 1;
writeln( "INF: Sent HeartBeat # ", i );
// --------------------------------------- // .send HeartBeat
var msg = dataBA.recv( string ); // .recv() from python
// - - - - - - - - - - - - - - - - - - - - // - - - - -WILL-[BLOCK]!!!
// ( ref. src )
writeln( "INF: [Chapel] got: ",
getField( msg, 1 )
);
dataBA.send( "back from chapel" ); // .send() to python
}
writeln( "INF: MAX_LOOPS were exhausted,",
" will exit-{} & .close()",
" channels' sockets before",
" [Chapel] exits to system."
);
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
dataBA.close( WAITms ); // explicit graceful termination
heartB.close( WAITms ); // explicit graceful termination
context.deinit(); // explicit context termination
// as not yet sure
// on auto-termination
// warranties