2

After browsing some other threads regarding my problem I think I've understood that I need to re-design my application. But just for clarification: I have a single TCP/IP connection between a client and a server. On the client side there are a number of threads running concurrently. Randomly one or more of these threads use the TCP/IP connection to communicate with the server. I've found out that, e. g. While a long running file transfer is active, using the connection with another thread concurrently might lead to errors. Though I've preceeded each message with a specific header including the data length it appears to me that the IP stack sometimes delivers a mix of more than one messages to my program, which means that though one message has net yet been delivered completely, part of another message is delivered to my read method. Is this a correct observation which matches the intended TCP/IP behaviour? Thanks in advance - Mario

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

For anybody who's interested: following is the source code of my test program. You may play with various values for the BUFFER_SIZE and the number of THREADS used to bombard the server socket with concurrent TCP/IP sends using the same socket. I've left out some error handling and removed a more sophisticated termination including the closing of the sockets. Test with a BUFFER_SIZE greater than 64KB always leads to errors on my machine.

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;

public class TCPTest
{
  private final static String INPUT_FILE   = "c:/temp/tcptest.in";
  private final static int    BUFFER_SIZE  = 64 * 1024 - 8; //65536;
  private final static int    MESSAGE_SIZE = 512 * 64 * 1024;
  private final static int    THREADS      = 3;
  private final static int    SIZE_OF_INT  = 4;
  private final static int    LENGTH_SIZE  = SIZE_OF_INT;
  private final static int    ID_SIZE      = SIZE_OF_INT;
  private final static int    HEADER_SIZE  = LENGTH_SIZE + ID_SIZE;
  private final static String NEW_LINE     = System.getProperty("line.separator");

  private ServerSocket m_serverSocket = null;
  private Socket       m_clientSocket = null;
  private int          m_iThreadCounter;

  public static void main(String[] args)  
  {
    new TCPTest();
  } // main

  public TCPTest() 
  {
    final String id = "ReaderThread[*]";
    // start a new thread creating a server socket waiting for connections 
    new Thread(new Runnable() 
    {
      public void run() 
      {
        try 
        {
          // create server socket and accept client requests
          m_serverSocket = new ServerSocket(12345);
          m_clientSocket = m_serverSocket.accept();
          // client request => prepare and read data
          long       startTime       = System.currentTimeMillis();
          byte[]     buffer          = new byte[BUFFER_SIZE];
          ByteBuffer header          = ByteBuffer.allocate(HEADER_SIZE);
          int        iTotalBytesRead = 0;
          boolean    fTerminate      = false;
          int        iBytesRead;
          // get hold of socket's input stream
          InputStream clientInputStream = m_clientSocket.getInputStream();
          // loop
          while (false == fTerminate)
          {
            // loop to read next header
            for (int i = 0; i < HEADER_SIZE; i++)
              clientInputStream.read(header.array(), i, 1);
            header.rewind();
            // get information of interest
            int iLength      = header.getInt();
            int iId          = header.getInt();
            int iLengthSoFar = 0;
            int iBytesLeft   = iLength;
            int iBytesToRead;
            // any length given?
            if ((0 < iLength) && (BUFFER_SIZE >= iLength))
            {
              // that's the case => read complete message
              while (iLengthSoFar < iLength)
              {
                // calculate number of bytes left
                iBytesLeft = iLength - iLengthSoFar;
                 // calculate maximum number of bytes to read
                if (iBytesLeft > BUFFER_SIZE)
                  iBytesToRead = BUFFER_SIZE;
                else
                  iBytesToRead = iBytesLeft;
                // read next portion of bytes
                if ((iBytesRead = clientInputStream.read(buffer, 0, iBytesToRead)) != -1)
                {
                  // maintain statistics
                  iTotalBytesRead += iBytesRead;
                  iLengthSoFar += iBytesRead;
                } // if
                else
                {
                  // finish => print message
                  System.out.println("==> "+id+": ERROR length=<-1> received " +
                        "for id=<"+iId+">");
                  fTerminate = true;
                  break;
                } // else
              } // while
            } // if
            else
            {
              System.out.println("==> "+id+": ERROR data length <= 0 for id=<"+iId+">");
              dump(header, 0, HEADER_SIZE / SIZE_OF_INT, "Error header");
            } // else
          } // while
          System.out.println("==> "+id+": "+ iTotalBytesRead + " bytes read in " 
                              + (System.currentTimeMillis() - startTime) + " ms.");
        } // try 
        catch (IOException e) 
        {
          e.printStackTrace();
        } // catch
      } // run
    }).start();
    // create the socket writer threads
    try
    {
      // ensure server is brought up and request a connection
      Thread.sleep(1000);
      System.out.println("==> "+id+": just awoke");
      Socket       socket             = new Socket("localhost", 12345);
      OutputStream socketOutputStream = socket.getOutputStream();
      System.out.println("==> "+id+": socket obtained");
      // create some writer threads
      for (int i = 0; i < THREADS; i++)
        // create a new socket writer and start the thread
        (new SocketWriter(socket, 
                          (i+1),
                          BUFFER_SIZE,
                          new String("WriterThread["+(i+1)+"]"),
                          socketOutputStream)).start();
    } // try
    catch (Exception e)
    {
      e.printStackTrace();
    } // catch
  } // TCPTestEx

  private final static void dump(ByteBuffer bb, int iOffset, int iInts, String header)
  {
    System.out.println(header);
    bb.rewind();
    for (int i = 0; i < iInts; i++)
      System.out.print(" " + Integer.toHexString(bb.getInt()).toUpperCase());
    System.out.print(NEW_LINE);
  } // dump

  private class SocketWriter extends Thread
  {
    Socket       m_socket;
    int          m_iId;
    int          m_iBufferSize;
    String       m_id;
    OutputStream m_os;

    protected SocketWriter(Socket socket, int iId, int iBufferSize, String id, OutputStream os)
    {
      m_socket       = socket;
      m_iId          = iId;
      m_iBufferSize  = iBufferSize;
      m_id           = id;
      m_os           = os;
      // increment thread counter
      synchronized (m_serverSocket)
      {
        m_iThreadCounter++;
      } // synchronized
    } // SocketWriter

    public final void run()
    {
      try 
      {
        long       startTime        = System.currentTimeMillis();
        ByteBuffer buffer           = ByteBuffer.allocate(m_iBufferSize + HEADER_SIZE); 
        int        iTotalBytesRead  = 0;
        int        iNextMessageSize = 512 * m_iBufferSize; 
        int        iBytesRead;
        // open input stream for file to read and send
        FileInputStream fileInputStream = new FileInputStream(INPUT_FILE);
        System.out.println("==> "+m_id+": file input stream obtained");
        // loop to read complete file
        while (-1 != (iBytesRead = fileInputStream.read(buffer.array(), HEADER_SIZE, m_iBufferSize))) 
        {
          // add length and id to buffer and write over TCP
          buffer.putInt(0, iBytesRead);
          buffer.putInt(LENGTH_SIZE, m_iId);
          m_os.write(buffer.array(), 0, HEADER_SIZE + iBytesRead);
          // maintain statistics and print message if so desired
          iTotalBytesRead += iBytesRead;
          if (iNextMessageSize <= iTotalBytesRead)
          {
            System.out.println("==> "+m_id+": <"+iTotalBytesRead+"> bytes processed");
            iNextMessageSize += MESSAGE_SIZE;
          } // if
        } // while
        // close my file input stream
        fileInputStream.close();
        System.out.println("==> "+m_id+": file input stream closed");
        System.out.println("==> "+m_id+": <"+ iTotalBytesRead + "> bytes written in " 
                            + (System.currentTimeMillis() - startTime) + " ms.");
        // decrement thread counter
        synchronized (m_serverSocket)
        {
          m_iThreadCounter--;
          // last thread?
          if (0 >= m_iThreadCounter)
            // that's the case => terminate
            System.exit(0);
        } // synchronized
      } // try 
      catch (Exception e) 
      {
        e.printStackTrace();
      } // catch
    } // run
  } // SocketWriter
} // TCPTest
  • What mechanism do you use to handle concurrent access to the same data in InputStream? If you didn't use any - it looks like a reason of duplicated messages (also it would be nice to have example of code). – Raman Feb 18 '13 at 15:33
  • 4
    TCP is just a byte stream - not messages, if you write to that stream concurrently from many threads without proper locking, you will get interleaving, the interleaving would usually be at boundaries of a single operating system level write() call on the socket. – nos Feb 18 '13 at 15:34
  • That sounds like the behavior I would expect. My suggestion would be that instead of having the various threads access the TCP socket directly, dedicate a single thread to handling the socket I/O and have the other threads send messages to that thread for it to send to the socket. That way entire messages will get queued up and sent in order, with no risk of partial messages getting interleaved with each other. – Jeremy Friesner Feb 18 '13 at 16:39
  • @timonk: Thanks but that's not an issue! – SuperMarioJava Feb 19 '13 at 12:44
  • @nos: Thanks. If you were right, everything would be okay for me. But as far as I've found out this is not true for single TCP sends bigger than the maximum allowable TCP window advertisement of 64K. – SuperMarioJava Feb 19 '13 at 12:45
  • @Jeremy: Thanks. I think that's what I'm going to do. Additinally I'll open a new data connection for each long running file transfer, e. g. as FTP is doing. – SuperMarioJava Feb 19 '13 at 12:45
  • @SuperMarioJava Thus the "usually". I'm sure there's other cases as well. Being java, you don't always get operating system level access though, so a single write to a socket stream might in some cases not end up being a single write() call down to the operating system. – nos Feb 19 '13 at 12:47
  • @nos: You're right. So I think I have to do a bit of redesign in order to make my program water proof! Thanks! – SuperMarioJava Feb 19 '13 at 13:09
  • i would suggest try using Boost socket libraries instead. – Rohit Feb 19 '13 at 13:10
  • @PowerPC: Might be a problem with Java!? – SuperMarioJava Feb 19 '13 at 13:17
  • u can use NDK, refer http://stackoverflow.com/questions/14036311/official-boost-library-support-for-android-and-ios for compiling boost on android. Boost is one of the best way.. – Rohit Feb 19 '13 at 13:19
  • @PowerPC: Thanks, but I don't need Android! – SuperMarioJava Feb 19 '13 at 13:34
  • oh.. sorry, boost libraries are C++, so preferrably it would involve jni stuff to communicate with java app. – Rohit Feb 19 '13 at 13:35
  • 1
    @PowerPC: Talk about cracking a nut with a sledgehammer :-) – SuperMarioJava Feb 19 '13 at 13:41

1 Answers1

0

Yer. TCP is a byte oriented stream protocol. That means that the application receives an (undelimited) stream of bytes. The concept of "message" should be provided by the application (or use a message oriented protocol instead).

Javier
  • 12,100
  • 5
  • 46
  • 57