0

I'm trying to have concurrency on a thread in my check-"AliveSystem" which is on every node in the cluster for sending alive-packets and checking if another node sent his alive-packet under the given time with max. 3 fails.

gotAlive() is executed outside when got an alive-packet

addNode() or removeNode() are executed when another node logs in/out

But when e.g. removeNode() is executed, so that getNodes().isEmpty() should be true because there are 100%ly no other nodes, then getNodes().isEmpty() still false

Maybe for better understanding, here is an example:

  1. node1 start
  2. enable AliveSystem (on node1)
  3. node2 start
  4. enable AliveSystem (on node2)
  5. addNode(node1) (on node2)
  6. addNode(node2) (on node1)
  7. stop node1
  8. removeNode(node1) (on node2)
  9. AliveSystem still check for node1 (on node2)

The AliveSystem class:

import com.google.common.collect.Maps;
import lombok.extern.log4j.Log4j2;
import net.mypvp.cloud.cluster.packets.NodeAlivePacket;
import net.mypvp.core.network.redwork.RedworkManager;

import java.io.Closeable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

@Log4j2
public class AliveSystem implements Closeable {
    
    // Period in millis for sending/receiving alive packets
    private static final long DEFAULT_ALIVE_PERIOD = 2000L;
    private static final int DEFAULT_CHECK_ALIVE_FAILS = 3;
    
    
    private final String CHANNEL_TOPIC;
    private final long ALIVE_PERIOD;
    private final int CHECK_ALIVE_FAILS;
    
    private final ScheduledExecutorService SEND_NODE_ALIVE_SCHEDULER;
    private final ScheduledExecutorService CHECK_NODES_ALIVE_SCHEDULER;
    
    // Map<Node, last alive-packet millis>>
    private final Map<Node, Long> ALIVE_PACKETS = Maps.newConcurrentMap();
    // Map<Node, Fails counter>
    private final Map<Node, Integer> FAILS = Maps.newConcurrentMap();
    
    /**
     * Constructor.
     *
     * @param channelTopic      where to send the keep-alive-packets to
     * @param nodeDeathConsumer consumer executed on node death
     */
    AliveSystem( String channelTopic, Consumer<Node> nodeDeathConsumer ) {
        this( channelTopic, DEFAULT_ALIVE_PERIOD, DEFAULT_CHECK_ALIVE_FAILS, nodeDeathConsumer );
    }
    
    /**
     * Constructor.
     *
     * @param channelTopic      where to send the keep-alive-packets to
     * @param alivePeriod       period to send new keep-alive-packet
     * @param checkAliveFails   number of fails before node death
     * @param nodeDeathConsumer consumer executed on node death
     */
    AliveSystem( String channelTopic, long alivePeriod, int checkAliveFails, Consumer<Node> nodeDeathConsumer ) {
        CHANNEL_TOPIC = channelTopic;
        ALIVE_PERIOD = alivePeriod;
        CHECK_ALIVE_FAILS = checkAliveFails;
        
        // Send node alive scheduler.
        SEND_NODE_ALIVE_SCHEDULER = Executors.newScheduledThreadPool( 5 );
        SEND_NODE_ALIVE_SCHEDULER.scheduleAtFixedRate( ( ) -> {
            if ( getClusterManager().isNodeLoggedIn() && !getNodes().isEmpty() )
                getRedwork().sendPacketAsync( new NodeAlivePacket(), CHANNEL_TOPIC );
        }, 1L, ALIVE_PERIOD, TimeUnit.MILLISECONDS );
        
        // Receive node alive scheduler.
        CHECK_NODES_ALIVE_SCHEDULER = Executors.newSingleThreadScheduledExecutor();
        CHECK_NODES_ALIVE_SCHEDULER.scheduleAtFixedRate( ( ) -> {
            if ( getNodes().isEmpty() ) return;
            long current = System.currentTimeMillis();
            getNodes().forEach( node -> {
                try {
                    log.error( "SYS: Check node {}", node.getNodeId() );
                    
                    // Check if alive packet got in time
                    if ( current - lastAlive( node ) <= ALIVE_PERIOD + 500L ) return;
                    
                    // Fail
                    if ( addFail( node ) < CHECK_ALIVE_FAILS ) return;
                    
                    // Max fails reached > death
                    gotAlive( node ); // reset stats
                    nodeDeathConsumer.accept( node );
                } catch ( Exception e ) {
                    e.printStackTrace();
                }
            } );
        }, 1L, ALIVE_PERIOD / 2L, TimeUnit.MILLISECONDS );
    }
    
    void addNode( Node node ) {
        gotAlive( node );
    }
    
    void removeNode( Node node ) {
        log.error( "SYS: Removed node {}", node.getNodeId() );
        ALIVE_PACKETS.remove( node );
        FAILS.remove( node );
        System.out.println( "1STILL NODES: " + ALIVE_PACKETS.keySet() );
    }
    
    void gotAlive( Node node ) {
        ALIVE_PACKETS.put( node, System.currentTimeMillis() );
        FAILS.put( node, 0 );
    }
    
    private int addFail( Node node ) {
        return FAILS.put( node, FAILS.get( node ) + 1 );
    }
    
    private long lastAlive( Node node ) {
        System.out.println( "2STILL NODES: " + ALIVE_PACKETS.keySet() );
        return ALIVE_PACKETS.get( node );
    }
    
    private Set<Node> getNodes( ) {
        System.out.println( "3STILL NODES: " + ALIVE_PACKETS.keySet() );
        return ALIVE_PACKETS.keySet();
    }
    
    private ClusterManager getClusterManager( ) {
        return ClusterManager.getInstance();
    }
    
    private RedworkManager getRedwork( ) {
        return getClusterManager().getRedwork();
    }
    
    @Override
    public void close( ) {
        SEND_NODE_ALIVE_SCHEDULER.shutdownNow();
        CHECK_NODES_ALIVE_SCHEDULER.shutdownNow();
    }
}

Exception:

java.lang.NullPointerException
    at net.mypvp.cloud.cluster.AliveSystem.lambda$null$1(AliveSystem.java:83)
    at java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(Unknown Source)
    at net.mypvp.cloud.cluster.AliveSystem.lambda$new$2(AliveSystem.java:78)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
java.lang.NullPointerException
Jason Aller
  • 3,541
  • 28
  • 38
  • 38
RoboFlax
  • 135
  • 3
  • 14
  • ConcurrentMap doesn't make your code magically thread safe. You need to use atomic methods like `putIfAbsent` or else use another level of locking or synchronization. – Daniel Pryden Mar 25 '18 at 17:27
  • Can you put an example? – RoboFlax Mar 25 '18 at 17:43
  • what do you mean with atomic methods? – RoboFlax Mar 25 '18 at 18:32
  • Given your comments, I get the sense that there's a lot about concurrency that you don't understand. It's hard to know how much to explain, but I suspect it would need to be much more than one answer here could contain. I would recommend the book *Java Concurrency in Practice* by Goetz et al as a good place to start. – Daniel Pryden Mar 25 '18 at 18:43
  • What is on line 83? – teppic Mar 25 '18 at 18:54
  • return ALIVE_PACKETS.get( node ); // in lastAlive() method – RoboFlax Mar 25 '18 at 18:59
  • Thanks to @DanielPryden pushing me in the right way! I've found https://dzone.com/articles/concurrenthashmap-isnt-always-enough – RoboFlax Mar 26 '18 at 14:13

0 Answers0