I'm new to the world of ZeroMQ and I'm working through the documentation of both NetMQ and ZeroMQ as I go. I'm currently implementing (or preparing to implement) the Paranoid Pirate Pattern, and hit a snag. I have a single app which is running the server(s), clients, and eventually queue, though I haven't implemented the queue yet. Right now, there should only be one server at a time running. I can launch as many clients as I like, all communicating with the single server. I am able to have my server "crash" and restart it (manually for now, automatically soon). That all works. Or at least, restarting the server works once.
To enforce that there's only a single server running, I have a thread (which I'll call the WatchThread) which opens a response socket that binds to an address and polls for messages. When the server dies, it signals its demise and the WatchThread decrements the count when it receives the signal. Here's the code snippet that is failing:
//This is the server's main loop:
public void Start(object? count)
{
num = (int)(count ?? -1);
_model.WriteMessage($"Server {num} up");
var rng = new Random();
using ResponseSocket server = new();
server.Bind(tcpLocalhost); //This is for talking to the clients
int cycles = 0;
while (true)
{
var message = server.ReceiveFrameString();
if (message == "Kill")
{
server.SendFrame("Dying");
return;
}
if (cycles++ > 3 && rng.Next(0, 16) == 0)
{
_model.WriteMessage($"Server {num} \"Crashing\"");
RequestSocket sock = new(); //This is for talking to the WatchThread
sock.Connect(WatchThreadString);
sock.SendFrame("Dying"); //This isn't working correctly
return;
}
if(cycles > 3 && rng.Next(0, 10) == 0)
{
_model.WriteMessage($"Server {num}: Slowdown");
Thread.Sleep(1000);
}
server.SendFrame($"Server{num}: {message}");
}
}
And here's the WatchThread code:
public const string WatchThreadString = "tcp://localhost:5000";
private void WatchServers()
{
_watchThread = new ResponseSocket(WatchThreadString);
_watchThread.ReceiveReady += OnWatchThreadOnReceiveReady;
while (_listen)
{
bool result = _watchThread.Poll(TimeSpan.FromMilliseconds(1000));
}
}
private void OnWatchThreadOnReceiveReady(object? s, NetMQSocketEventArgs a)
{
lock (_countLock)
{
ServerCount--;
}
_watchThread.ReceiveFrameBytes();
}
As you can see, it's pretty straight forward. What am I missing? It seems like what should happen is exactly what happens the first time everything is instantiated: The server is supposed to go down, so it opens a new socket to the pre-existing WatchThread and sends a frame. The WatchThread receives the message and decrements the counter appropriately. It's only on the second server where things don't behave as expected...
Edit: I was able to get it to work by unbinding/closing _watchThread and recreating it... it's definitely suboptimal and it still seems like I'm missing something. It's almost as if for some reason I can only use that socket once, though I have other request sockets being used multiple times.
Additional Edit: My netstat output with 6 clients running (kubernetes is in my host file as 127.0.0.1 as is detailed here):
TCP 127.0.0.1:5555 MyComputerName:0 LISTENING
TCP 127.0.0.1:5555 kubernetes:64243 ESTABLISHED
TCP 127.0.0.1:5555 kubernetes:64261 ESTABLISHED
TCP 127.0.0.1:5555 kubernetes:64264 ESTABLISHED
TCP 127.0.0.1:5555 kubernetes:64269 ESTABLISHED
TCP 127.0.0.1:5555 kubernetes:64272 ESTABLISHED
TCP 127.0.0.1:5555 kubernetes:64273 ESTABLISHED