0

I am working on a multi-client TCP server in Java. As the server must be able to perform a lot of simulaneous operations, I created a thread, used to send data to my client, through the socket. It's defined as follow :

public class MessageSender implements Runnable{

    private OutputStream output;
    private volatile ArrayList<byte[]> messagesToSend;

    public MessageSender(OutputStream _output)
    {
        output = _output;
        messagesToSend = new ArrayList<byte[]>(0);
    }

    public void run()
    {
        while (true)
        {
           if (messagesToSend.size() > 0)
           {
               sendMessage(messagesToSend.get(0));
               messagesToSend.remove(0);
           }
        }
    }

    public void setMessageToSend(Message message)
    {
        messagesToSend.add(message.getMessageHandler().getBytes());
    }

    public void sendMessage(byte[] bytes)
    {
        if (bytes == null)
        {
            System.out.println("bytes = null");
            return ;
        }
        try
        {
            output.write(bytes, 0, bytes.length);
        }
        catch (IOException e)
        {
            System.out.println(e);
        }
    }
}

The class Message, that you can see in setMessageToSend(), is a class of mine. I also have created other classes (Channel_test, Channel_status), corresponding to specific messages, and that inherit from Message class.

I have another thread, in which I create a Channel_status, as follow :

eventBus.consumer(msg + port , message -> {
    sender.setMessageToSend(new Channel_status(eParameters));
    channel_test = new Channel_test(eParameters);
    System.out.println("Created Message");
});

This is a Vertx handler, that is only called once, when receiving a specific message from the client. It is called, and channel_status is instansiated as expected. In this same thread, I have a Vertx timer, in which I send a byte array contained by my channel_status class, to the MessageSender.

vertx.setPeriodic(duration, id -> {
    System.out.println(Arrays.toString(channel_test.getMessageHandler().getBytes()));
    sender.setMessageToSend(channel_test);
});

As you can see, I am printing it's value to make sure it is not null. And, it's never null in the timer. But when the MessageSender receives it, sometimes it is null (by sometimes I mean approximately 1/50 times). Why? Is it a concurrency issue?

Thanks.

xingbin
  • 27,410
  • 9
  • 53
  • 103
souki
  • 1,305
  • 4
  • 23
  • 39

2 Answers2

2

Making ArrayList<byte[]> volatile does not gurantee that when you add new element in thread1, thread2 can see it.

You should use a list that support concurrency, such as :

messagesToSend  = Collections.synchronizedList(new ArrayList<byte[]>(0));

This can make sure when you add new element by setMessageToSend, the new element will be visible in run method.

Update:

Collections.synchronizedList still can not gurantee thread safe in this case. Since in run method, the three operations are not atomic.

if (messagesToSend.size() > 0)
{
    sendMessage(messagesToSend.get(0));  
    messagesToSend.remove(0);
}

A BlockingQueue can solve this problem:

private BlockingQueue<byte[]> messagesToSend;

public MessageSender(OutputStream _output) {
    output = _output;
    messagesToSend = new LinkedBlockingQueue<>();
}

public void run() {
    while (true) {
        try {
            sendMessage(messagesToSend.take());  // this will get blocked untill the queue is not empty
        } catch (InterruptedException e) {

        }
    }
}

public void setMessageToSend(Message message) {
    try {
        messagesToSend.put(message.getMessageHandler().getBytes()); // put new element in the queue, check if run method is blocked, wake it up
    } catch (InterruptedException e) {

    }
}
xingbin
  • 27,410
  • 9
  • 53
  • 103
  • wait and notify still needed for less cpu cycle in the while loop – Hemant Patel May 02 '18 at 15:15
  • 1
    @HemantPatel Yes, `wait` and `notify` is needed. And I think a `BlockingQueue` is more clean than call `wait` and `notify`. – xingbin May 02 '18 at 15:28
  • One question though. BlockingQueue is an abstract class, and cannot be instantiated right? Maybe I should use LinkedBlockingQueue? – souki May 02 '18 at 15:32
0

Change your run method as

public void run() {
    while (true) {
        synchronized(messagesToSend) {
           while (messagesToSend.isEmpty()) {
               messagesToSend.wait();
           }
        }
        sendMessage(messagesToSend.get(0));
        messagesToSend.remove(0);
   }

}

And setMessageToSend() as

public void setMessageToSend(Message message) {
    messagesToSend.add(message.getMessageHandler().getBytes());
    messagesToSend.notify();
}
Hemant Patel
  • 3,160
  • 1
  • 20
  • 29