I'm building a job/task distributor that works the same as your crawler, in principal, at least. Here's a few things I've learned:
Define All Events
Communication between server and crawlers will be based on different things happening in your system, such as dispatching work from server to crawler, or a crawler sending a heartbeat message to the server. Define the system's event types; they are the use cases:
DISPATCH_WORK_TO_CRAWLER_EVENT
CRAWLER_NODE_STATUS_EVENT
...
Define a Message Standard
All communication between server and crawlers should be done using ZMsg's, so define a standard that organizes your frames, something like this:
Frame1: "Crawler v1.0" //this is a static header
Frame2: <event type> //ex: "CRAWLER_NODE_STATUS_EVENT"
Frame3: <content xml/json/binary> //content that applies to this event (if any)
Now you can create message validators to validate ZMsgs received between peers since you have a standard convention all messages must follow.
Server
Use a single ROUTER
on the server for asynchrounous and bidirectional communication with the crawlers. Also, use a PUB
socket for broadcasting heartbeat messages.
Don't block on the ROUTER socket, use a POLLER
to loop every 5s or whatever, this allows the server to do other things periodically, like broadcast heartbeat events to the crawlers; something like this:
Socket rtr = .. //ZMQ.ROUTER
Socket pub = .. //ZMQ.PUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( rtr, ZMQ.Poller.POLLIN)
poller.register( pub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//messages from crawlers
msg = ZMsg.recvMsg(rtr)
}
//send heartbeat messages
ZMsg hearbeatMsg = ...
//create message content here,
//publish to all crawlers
heartbeatMsg.send(pub)
}
To address your question about worker awareness, a simple and effective method uses a FIFO stack along with the heartbeat messages; something like this:
- server maintains a simple FIFO stack in memory
- server sends out heartbeats; crawlers respond with their node name; the ROUTER automatically puts the address of the node in the message as well (read up on message enveloping)
- push 1 object onto the stack containing the node name and node address
- when the server wants to dispatch work to a crawler, just pop the next object from the stack, create the message and address is properly (using the node address), and off it goes to that worker
- dispatch more work to other crawlers the same way; when a crawler responds back to the server, just push another object with node name/address back on the stack; the other workers won't be available until they respond, so we don't bother them.
This is a simple but effective method of distributing work based on worker availability instead of blindly sending out work. Check lbbroker.php example, the concept is the same.
Crawler (Worker)
The worker should use a single DEALER
socket along with a SUB
. The DEALER
is the main socket for async communication, and the SUB subscribes to heartbeat messages from the server. When the worker receives a heartbeat messages, it responds to the server on the DEALER socket.
Socket dlr = .. //ZMQ.DEALER
Socket sub = .. //ZMQ.SUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( dlr, ZMQ.Poller.POLLIN)
poller.register( sub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//message from server
msg = ZMsg.recvMsg(dlr)
}
if( poller.pollin(1)){
//heartbeat message from server
msg = ZMsg.recvMsg(sub)
//reply back with status
ZMsg statusMsg = ...
statusMsg.send(dlr)
}
The rest you can figure out on your own. Work through the PHP examples, build stuff, break it, build more, it's the only way you'll learn!
Have fun, hope it helps!