I'm trying to have concurrency on a thread in my check-"AliveSystem" which is on every node in the cluster for sending alive-packets and checking if another node sent his alive-packet under the given time with max. 3 fails.
gotAlive()
is executed outside when got an alive-packet
addNode()
or removeNode()
are executed when another node logs in/out
But when e.g. removeNode()
is executed, so that getNodes().isEmpty()
should be true because there are 100%ly no other nodes, then getNodes().isEmpty()
still false
Maybe for better understanding, here is an example:
- node1 start
- enable AliveSystem (on node1)
- node2 start
- enable AliveSystem (on node2)
addNode(node1)
(on node2)addNode(node2)
(on node1)- stop node1
removeNode(node1)
(on node2)- AliveSystem still check for node1 (on node2)
The AliveSystem class:
import com.google.common.collect.Maps;
import lombok.extern.log4j.Log4j2;
import net.mypvp.cloud.cluster.packets.NodeAlivePacket;
import net.mypvp.core.network.redwork.RedworkManager;
import java.io.Closeable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Log4j2
public class AliveSystem implements Closeable {
// Period in millis for sending/receiving alive packets
private static final long DEFAULT_ALIVE_PERIOD = 2000L;
private static final int DEFAULT_CHECK_ALIVE_FAILS = 3;
private final String CHANNEL_TOPIC;
private final long ALIVE_PERIOD;
private final int CHECK_ALIVE_FAILS;
private final ScheduledExecutorService SEND_NODE_ALIVE_SCHEDULER;
private final ScheduledExecutorService CHECK_NODES_ALIVE_SCHEDULER;
// Map<Node, last alive-packet millis>>
private final Map<Node, Long> ALIVE_PACKETS = Maps.newConcurrentMap();
// Map<Node, Fails counter>
private final Map<Node, Integer> FAILS = Maps.newConcurrentMap();
/**
* Constructor.
*
* @param channelTopic where to send the keep-alive-packets to
* @param nodeDeathConsumer consumer executed on node death
*/
AliveSystem( String channelTopic, Consumer<Node> nodeDeathConsumer ) {
this( channelTopic, DEFAULT_ALIVE_PERIOD, DEFAULT_CHECK_ALIVE_FAILS, nodeDeathConsumer );
}
/**
* Constructor.
*
* @param channelTopic where to send the keep-alive-packets to
* @param alivePeriod period to send new keep-alive-packet
* @param checkAliveFails number of fails before node death
* @param nodeDeathConsumer consumer executed on node death
*/
AliveSystem( String channelTopic, long alivePeriod, int checkAliveFails, Consumer<Node> nodeDeathConsumer ) {
CHANNEL_TOPIC = channelTopic;
ALIVE_PERIOD = alivePeriod;
CHECK_ALIVE_FAILS = checkAliveFails;
// Send node alive scheduler.
SEND_NODE_ALIVE_SCHEDULER = Executors.newScheduledThreadPool( 5 );
SEND_NODE_ALIVE_SCHEDULER.scheduleAtFixedRate( ( ) -> {
if ( getClusterManager().isNodeLoggedIn() && !getNodes().isEmpty() )
getRedwork().sendPacketAsync( new NodeAlivePacket(), CHANNEL_TOPIC );
}, 1L, ALIVE_PERIOD, TimeUnit.MILLISECONDS );
// Receive node alive scheduler.
CHECK_NODES_ALIVE_SCHEDULER = Executors.newSingleThreadScheduledExecutor();
CHECK_NODES_ALIVE_SCHEDULER.scheduleAtFixedRate( ( ) -> {
if ( getNodes().isEmpty() ) return;
long current = System.currentTimeMillis();
getNodes().forEach( node -> {
try {
log.error( "SYS: Check node {}", node.getNodeId() );
// Check if alive packet got in time
if ( current - lastAlive( node ) <= ALIVE_PERIOD + 500L ) return;
// Fail
if ( addFail( node ) < CHECK_ALIVE_FAILS ) return;
// Max fails reached > death
gotAlive( node ); // reset stats
nodeDeathConsumer.accept( node );
} catch ( Exception e ) {
e.printStackTrace();
}
} );
}, 1L, ALIVE_PERIOD / 2L, TimeUnit.MILLISECONDS );
}
void addNode( Node node ) {
gotAlive( node );
}
void removeNode( Node node ) {
log.error( "SYS: Removed node {}", node.getNodeId() );
ALIVE_PACKETS.remove( node );
FAILS.remove( node );
System.out.println( "1STILL NODES: " + ALIVE_PACKETS.keySet() );
}
void gotAlive( Node node ) {
ALIVE_PACKETS.put( node, System.currentTimeMillis() );
FAILS.put( node, 0 );
}
private int addFail( Node node ) {
return FAILS.put( node, FAILS.get( node ) + 1 );
}
private long lastAlive( Node node ) {
System.out.println( "2STILL NODES: " + ALIVE_PACKETS.keySet() );
return ALIVE_PACKETS.get( node );
}
private Set<Node> getNodes( ) {
System.out.println( "3STILL NODES: " + ALIVE_PACKETS.keySet() );
return ALIVE_PACKETS.keySet();
}
private ClusterManager getClusterManager( ) {
return ClusterManager.getInstance();
}
private RedworkManager getRedwork( ) {
return getClusterManager().getRedwork();
}
@Override
public void close( ) {
SEND_NODE_ALIVE_SCHEDULER.shutdownNow();
CHECK_NODES_ALIVE_SCHEDULER.shutdownNow();
}
}
Exception:
java.lang.NullPointerException
at net.mypvp.cloud.cluster.AliveSystem.lambda$null$1(AliveSystem.java:83)
at java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(Unknown Source)
at net.mypvp.cloud.cluster.AliveSystem.lambda$new$2(AliveSystem.java:78)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException