0

I have to build a JAVA Nio Server Application in JBoss to read data from a 10-200 Sensor Boxes. They open a stream and send data to me all the time. The comunication is bidirectional. Now, sometimes it can happen, that these Boxes (or the server) have some internal error. To detect this kind of problems, an observer thread checks every 5 seconds, if a data block came in since the last check. If none of my Boxes sent data till then, something bad happened and I want to restart the whole socket comunication.

Now, it is well documentated how to build up a socket connection with NIO, but it is harder to find complexe examples how to clean reset them. And here is my problem: when my watchdog detects that no data came in the last 5s, it calls close() and then startEngine(). But after that, still no data arrive. Something seems blocked, some ressource still associated or like that. If I restart my JBoss, data arrive again. Can somebody give me a hint?

thank you for your time! Stefan

public class TestServer 
{
  private NIOServer server;
  private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>();

  class NIOServer extends Thread 
  {
        class MessageBuffer
        {
              int [] msgAsByte = new int[msgSize];
              int pos = 0;
              int lastSign = 0;                                    
              int bytesRead = 0;
        }
        private ByteBuffer readBuffer = ByteBuffer.allocate(256);
        private Selector selector;
        private boolean stop = false;
        private int[] ports;
        private int msgSize = 48;
        private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>();

        private List<ServerSocketChannel> channels;
        // Maps a SocketChannel to a list of ByteBuffer instances
        private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>();

        public NIOServer(int[] ports) {
              this.ports = ports;
        }

        private void stopAll()
        {
              stop = true;

              try 
              {
                    server.interrupt();
                    server.join(3000);
              } 
              catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
              }
              closeConnections();
        }

        public void sendData(SocketChannel socket, byte[] data) 
        { 
              // And queue the data we want written
              synchronized (this.pendingDataToWrite) {
                    List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket);
                    if (queue == null) {
                          queue = new ArrayList<ByteBuffer>();
                          this.pendingDataToWrite.put(socket, queue);
                    }
                    queue.add(ByteBuffer.wrap(data));
              }

              SelectionKey key = socket.keyFor(this.selector);
              if(key != null)
                    key.interestOps(SelectionKey.OP_WRITE);
              // Finally, wake up our selecting thread so it can make the required changes
              this.selector.wakeup();
        }

        public void run() 
        {
              try
              {
                    stop = false;
                    selector = Selector.open();
                    channels = new ArrayList<ServerSocketChannel>();
                    ServerSocketChannel serverchannel;
                    for (int port : ports) 
                    {
                          try
                          {
                                serverchannel = ServerSocketChannel.open();
                                serverchannel.configureBlocking(false);
                                try
                                {
                                      serverchannel.socket().setReuseAddress(true);
                                }
                                catch(SocketException se)
                                {
                                      //
                                }
                                serverchannel.socket().bind(new InetSocketAddress(port));
                                serverchannel.register(selector, SelectionKey.OP_ACCEPT);
                                channels.add(serverchannel);
                          }
                          catch(Exception e)
                          {
                                //
                          }
                    }
                    while (!stop) 
                    {

                          SelectionKey key = null;
                          try 
                          {
                                selector.select();
                                Iterator<SelectionKey> keysIterator = selector.selectedKeys()
                                            .iterator();
                                while (keysIterator.hasNext()) 
                                {
                                      key = keysIterator.next();

                                      if(key.isValid())
                                      {
                                            if (key.isAcceptable()) 
                                            {
                                                  accept(key);
                                            } 
                                            else if (key.isReadable()) 
                                            {
                                                  readData(key);
                                            } 
                                            else if (key.isWritable()) 
                                            {
                                                  writeData(key);
                                            }
                                      }
                                      else
                                      {
                                            SocketChannel sc = (SocketChannel) key.channel(); 
                                      }
                                      keysIterator.remove();
                                }
                          }
                          catch ( Exception e) 
                          {
                                if(e instanceof IOException || e instanceof ClosedSelectorException)
                                {
                                      try
                                      {
                                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                                            channels.remove(ssc);
                                            ssc.close();
                                            key.cancel();
                                      }
                                      catch(Exception ex)
                                      {
                                            //
                                      }

                                }
                                else
                                {
                                      //
                                }
                          }
                    } 
              }
              catch(Exception e1)
              {
                    //
              }

              closeConnections();

        }

        private void closeConnections()
        {
              //if thread is stopped, close all 
              try
              {
                    try 
                    {
                          if(this.selector == null || this.selector.keys() == null)
                          {
                                log.debug("No selectors or keys found to close");
                          }
                          else
                          {
                                Iterator<SelectionKey> keys = this.selector.keys().iterator();
                                while(keys.hasNext()) 
                                {
                                      SelectionKey key = keys.next();
                                      key.cancel();
                                }
                          }
                    }
                    catch(Exception ex) {
                          //
                    }
                    if(selector != null)
                          selector.close();
                    if(channels != null)
                    {
                          for(ServerSocketChannel channel:channels)
                          {
                                channel.socket().close();
                                channel.close();
                          }
                    }

                    if(clientsList != null)
                    {
                          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator();
                          while(hfm.hasNext()) 
                          {
                                Map.Entry<String, SocketChannel> s = hfm.next();
                                s.getValue().close();
                          }
                    }
                    clientsList=null;

                    selector = null;
                    channels = null;
                    pendingDataToWrite = null;
              }
              catch(Exception e)
              {
                    //
              }

        }

        private void accept(SelectionKey key) throws IOException 
        {

              ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
              SocketChannel sc = ssc.accept();
              sc.configureBlocking(false);
              sc.register(selector, SelectionKey.OP_READ);

              String ip = sc.socket().getRemoteSocketAddress().toString();
              if(!buffer.containsKey(ip))
                    buffer.put(ip, new MessageBuffer());
        }

        private void readData(SelectionKey key) throws Exception
        {

              SocketChannel sc = (SocketChannel) key.channel();      

              MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString());
              try
              {
                    buf.bytesRead = sc.read(readBuffer); //read into buffer.
              }
              catch(Exception e2)
              {
                    sc.close();
                    buffer.remove(sc);
              }

              //close connection
              if (buf.bytesRead == -1)
              {
                    sc.close();
                    key.cancel();
                    return;
              }

              readBuffer.flip();      //make buffer ready for read

              while(readBuffer.hasRemaining())
              {
                    //Read the data and forward it to another Process...
              }

              readBuffer.compact(); //make buffer ready for writing

        }

        private void writeData(SelectionKey key) throws Exception
        {
              SocketChannel socketChannel = (SocketChannel) key.channel();
              synchronized (this.pendingDataToWrite) {
                    List queue = (List) this.pendingDataToWrite.get(socketChannel);

                    // Write until there's not more data ...
                    while (!queue.isEmpty()) {
                          ByteBuffer buf = (ByteBuffer) queue.get(0);
                          try
                          {
                                socketChannel.write(buf);
                          }
                          catch(Exception e)
                          {
                                //
                          }
                          finally
                          {
                                queue.remove(0);
                          }
                          if (buf.remaining() > 0) {
                                // ... or the socket's buffer fills up
                                break;
                          }
                    }

                    key.interestOps(SelectionKey.OP_READ);
              }
        }
  }



  public void close() {

        if (server != null && server.isAlive()) 
        {      
                    server.stopAll(); 
        }
        if(clientsList != null)
        {
              clientsList.clear();
        }
        server = null;

  }

  public void startEngine(int[] ports) {
        if (ports != null) {
              for (int port : ports)
                    log.info("Listening on port " + port);
              server= new NIOServer(ports);
              server.start();
        }
  }

}
user3354754
  • 79
  • 1
  • 7

1 Answers1

1

Use a select() timeout.

If the timeout happens, close all the registered SocketChannels.

If you want to get more fine-grained, keep track of the last I/O time on each channel, and close those that have expired at the bottom of each select() loop.

NB Your technique for OP_WRITE is not correct. There are many answers here showing how to use it properly.

user207421
  • 305,947
  • 44
  • 307
  • 483
  • thank you for your input, I suppose that you mean your feedback to threads like this https://stackoverflow.com/questions/17556901/java-high-load-nio-tcp-server? Meaning that I should write when I want to write and only if this operation returns zero register the OP_WRITE. Correct? The select + timeout helps me to recognize when no new data arrive, but it didn't fix my problem that after calling close() and restartEngine() I still don't get new data... – user3354754 Jul 19 '17 at 12:43
  • You won't get anything until the clients reconnect. – user207421 Jul 19 '17 at 12:53
  • well, when I restart my jboss they reconnect and all works fine. But if I only close the sockets the don't do it. Is there a way to force them to reconnect – user3354754 Jul 19 '17 at 13:17
  • Of course not. There is no connection via which to tell them to do so. They will have to just time out or whatever they do. – user207421 Jul 19 '17 at 16:27
  • hmmm, then there must be some error in my code, close+restartEngine is called but no data come after that. Restarting the JBoss helps, so the clients do what they should do... any hint? – user3354754 Jul 24 '17 at 05:50
  • Sounds like a problem at the client end. – user207421 Aug 26 '17 at 01:03