1

Code below works and sends message at scheduled time but I think it isn't a good solution to open new socket every time timer executes scheduled task. What I would like is to open socket in run method only once and access it in SendMessage class whenever new instance of class is made in timer. That way it doesn't work, it only sends one message and then stops sending. Also I would be happy for some critics about code or tips for making it thread safe.

public class Client implements Runnable{

// Client Constructor here

@Override
public void run(){
    //SENDS ONLY ONE MESSAGE
    pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

    Timer timer = new Timer();

    timer.schedule(new SendMessage(), 0, 1000/mps);
}

private class SendMessage extends TimerTask{

    private int id;

    @Override
    public void run() {

        try
          {  // THIS WORKS FINE, SENDS MESSAGES AT SCHEDULED TIME                     
             pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

             OutputStream outToServer = pitcherSocket.getOutputStream();

             DataOutputStream out = new DataOutputStream(outToServer);

             out.writeInt(id);

             out.flush();

          }catch(IOException e)
          {
             e.printStackTrace();
          }
       }
    }
}

EDIT: WHOLE CODE

CLIENT

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class Pitcher implements Runnable{

private int port;
private int mps;
private int size;
private String hostname;
private List<Integer> messageIds = Collections.synchronizedList(new     ArrayList<Integer>());
private Socket pitcherSocket;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;


public Pitcher(int port, int mps, int size, String hostname) {

    this.port = port;
    this.mps = mps;
    this.size = size;
    this.hostname = hostname;
}

@Override
public void run(){

    System.out.println("Pitcher running...");
    System.out.println();

    Timer timer = new Timer();

    timer.schedule(new SendMessage(), 0, 1000/mps); 

    timer.schedule(new DisplayStatistics(), 0, 1000/mps);

}

//Nested class that sends messages
private class SendMessage extends TimerTask{

    private int numberOfSentMessages = 0;
    private int id;

    @Override
    public void run() {

        try {                         
             pitcherSocket = new Socket(InetAddress.getByName(hostname), port);

             OutputStream outToServer = pitcherSocket.getOutputStream();

             DataOutputStream out = new DataOutputStream(outToServer);

             //send message size
             out.writeInt(size);

             //message id is same as number of the sent message
             id = numberOfSentMessages + 1;
             out.writeInt(id);
             messageIds.add(id);



             //get system timestamp
             long currentTimestamp = System.currentTimeMillis();
             out.writeLong(currentTimestamp);

             //fill in the rest-
             byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];     //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes)
             out.write(rest);

             out.flush();

             numberOfSentMessages++;


             InputStream inFromServer = pitcherSocket.getInputStream();
             DataInputStream in = new DataInputStream(inFromServer);

             Integer catcherMessageSize = in.readInt();
             Integer catcherId = in.readInt();
             long catcherTimestamp = in.readLong();

             System.out.println("Sent message:     " + size + " " + id + " " + currentTimestamp + "...");
             System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "...");
             System.out.println();

          }catch(IOException e)
          {
             e.printStackTrace();
          }

    }

}

}

SERVER

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

public class Catcher implements Runnable{

private int port;
private String bind;
private ServerSocket serverSocket;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;

public Catcher(int port, String bind) {

    this.port = port;
    this.bind = bind;
}

@Override
public void run() {

    System.out.println("Catcher running...");
    System.out.println();

    try {
        serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind));
    } 
    catch (IOException e1) {
        e1.printStackTrace();
    }

    while(true){

         try
         {              
            Socket server = serverSocket.accept();

            DataInputStream in = new DataInputStream(server.getInputStream());

            Integer pitcherMessageSize = in.readInt();
            Integer pitcherId = in.readInt();
            long pitcherTimestamp = in.readLong();

            DataOutputStream out = new DataOutputStream(server.getOutputStream());

            //message id and size are sent back
            out.writeInt(pitcherMessageSize);
            out.writeInt(pitcherId);

            //send back current time
            long currentTimestamp = System.currentTimeMillis();
            out.writeLong(currentTimestamp);

            //fill in the rest
            byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes)
            out.write(rest);

            out.flush();

            System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "...");
            System.out.println("Sent message:     " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "...");
            System.out.println();

            //server.close();

         }
         catch(SocketTimeoutException s){
            System.out.println("Socket timed out!");
            break;
         }
         catch(IOException e){
            e.printStackTrace();
            break;
         }
      } 
}
}
asdf
  • 173
  • 1
  • 1
  • 11
  • are you seeing any exception ? when it stops sending does the program ends or is it just hanging ? – niceman Jul 09 '16 at 17:12
  • there is no exception, server receives first id and continues to listen. Client timer still executes SendMessage class and tries to send but nothind is send after first iteration – asdf Jul 09 '16 at 17:24
  • From what I see, you are creating and executing `SendMessage` on a scheduled basis. `SendMessage` attempts to reconnect via a `Socket` every time it's created. – Vince Jul 09 '16 at 17:57
  • Do you have any idea why it doesn't reconnect? – asdf Jul 09 '16 at 17:59
  • Show your server-side code. Are you terminating the connection and re-establishing every time? Or are you only doing it client-side? You never close the socket client-side aswell. I'm not sure why you would want to connect, send a message, disconnect, then reconnect. Am I misunderstanding something? – Vince Jul 09 '16 at 18:00
  • Server is always running. This is assignment I got as a port of a job application. What i need to do is open a client socket, send messages to server, server receives message, forms answer and then I receive that answer on client side again. I need to repeat that action until client is terminated. It needs to be repeated at "messages per second" rate – asdf Jul 09 '16 at 18:09
  • There is no code here that receives the message on the client side. – user207421 Jul 09 '16 at 18:10
  • I know, code was to big so I posted a small part. That code part is irrelevant, it behaves the same, with or without it. I just don't understant why I need to open a new socket repetedly, it seems logical to open it once and use it every time I call SendMessage class – asdf Jul 09 '16 at 18:15
  • Then `SendMessage` shouldn't be creating a new `Socket`. Instead, you should wrap the socket's streams in your `Client` class, then pass the wrappers to `SendMessage` so the streams can be used for communication (or really, just pass it the `OutputStream` wrapper, since `SendMessage` should only be sending data, based on it's identifier. In this case, you'll need a `ReadMessage` managing the `InputStream` wrapper). Do not attempt to create a connection every time. Instead, create it once, then re-use the same streams until the connection ends. – Vince Jul 09 '16 at 18:16
  • @VinceEmigh this is the exact reason I posted the question. I can't get that to work – asdf Jul 09 '16 at 18:21
  • EDITED: Posted whole code server and client. This works great, except i want to change it to create socket only once on client – asdf Jul 09 '16 at 18:21
  • @asdf did you find a solution? – user25 May 09 '18 at 21:21

3 Answers3

0

Have you thought about making both socket and the DataOutputStream member variables of SendMessage. This is some code to give you a rough start. You will probably want to put some enhancements like checking whether the socket is open and being able to create a new one if the current one is closed...

private class SendMessage extends TimerTask {
    private int id = 10;
    private Socket pitchSocket;
    private DataOutputStream out;

    public SendMessage(Socket socket) {
        this.pitchSocket = socket;
        try{
            out = new DataOutputStream(pitchSocket.getOutputStream());
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {  
            out.writeInt(id);
            out.flush();
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
}
Hovercraft Full Of Eels
  • 283,665
  • 25
  • 256
  • 373
0

After being able to view the whole code I think that you definitely have some threading issues, though I think they are more heavily on the server side rather than the client side. Your server is single threaded. That means that you can only deal with one request at a time. You want a multithreaded server. I've refactored your code to create an example of Catcher which is multithreaded. I'm using the Thead class to do all of this which may be a little old fashioned. You might want to take a look at java.util.concurrent, they will probably have a more up to date.

package clientserver;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

public class Catcher implements Runnable{

private int port;
private String bind;
private ServerSocket serverSocket;



public Catcher(int port, String bind) {

    this.port = port;
    this.bind = bind;
}

@Override
public void run() {

    System.out.println("Catcher running...");
    System.out.println();

    try {
        serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind));
    } 
    catch (IOException e1) {
        e1.printStackTrace();
   }

    while(true){
         try
         {              
           new Thread(new CatcherHandler(serverSocket.accept())).start();
           Thread.sleep(1000);

        }
        catch(SocketTimeoutException s){
           System.out.println("Socket timed out!");
           break;
        }
        catch(IOException e){
           e.printStackTrace();
           break;
        } catch (InterruptedException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }
     } 
}

public static void main(String[] argv){
    new Thread( new Catcher(8093, "localhost")).start();;

}
}

class CatcherHandler implements Runnable{   
   Socket server;
   DataOutputStream out;
   DataInputStream in;

   private static final int INT_SIZE = 4;
   private static final int LONG_SIZE = 8;

   public CatcherHandler(Socket server) {
       super();
       this.server = server;
       try {
           in = new DataInputStream(server.getInputStream());
          out = new DataOutputStream(server.getOutputStream());

       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }

   }



   @Override
   public void run() {
       try{
           if(in.available() > 0){

              Integer pitcherMessageSize = in.readInt();
               Integer pitcherId = in.readInt();
               long pitcherTimestamp = in.readLong();

               //message id and size are sent back
               out.writeInt(pitcherMessageSize);
               out.writeInt(pitcherId);

               //send back current time
               long currentTimestamp = System.currentTimeMillis();
               out.writeLong(currentTimestamp);

               //fill in the rest
               byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes)
               out.write(rest);

               out.flush();

               System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "...");
               System.out.println("Sent message:     " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "...");
               System.out.println();
               Thread.sleep(1000);

           }
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (InterruptedException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }finally{}
       //server.close();

   }
} 

Additionally I refactored your client to be able to use one socket and be tread safe. Now SendMessage takes in a DataInputStream and a DataOutputSteam as it's arguments.

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class Pitcher implements Runnable{

private int port;
private int mps;
private int size;
private String hostname;
private List<Integer> messageIds = Collections.synchronizedList(new     ArrayList<Integer>());
private Socket pitcherSocket;
private DataOutputStream out;
private DataInputStream in;

//constatns, integer is 4 bytes, long is 8 bytes
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;


public Pitcher(int port, int mps, int size, String hostname) {

    this.port = port;
    this.mps = mps;
    this.size = size;
    this.hostname = hostname;



    try {
        this.pitcherSocket = new Socket(InetAddress.getByName(hostname), port);
        out = new DataOutputStream(pitcherSocket.getOutputStream());
        in = new DataInputStream(pitcherSocket.getInputStream());
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }


}

public static void main(String[] argv) throws Exception{
    for(int i = 0; i < 10; i++){
        new Thread(new Pitcher(8093, 1, 200, "localhost")).start();
        Thread.sleep(1000);
    }

    Thread.sleep(10000);
}

@Override
public void run(){

    System.out.println("Pitcher running...");
    System.out.println();

    Timer timer = new Timer();

    timer.schedule(new SendMessage(out, in), 0, 1000); 

    //timer.schedule(new DisplayStatistics(), 0, 1000);

}

//Nested class that sends messages
private class SendMessage extends TimerTask{

    private int numberOfSentMessages = 0;
    private int id;
    private DataOutputStream out;
    private DataInputStream in;

    public SendMessage(DataOutputStream out, DataInputStream in){
        this.out = out;
        this.in = in;
    }

    @Override
    public void run() {

        try {                         
            long currentTimestamp = 0L;
            synchronized(out){
                 //send message size
                 out.writeInt(size);

                 //message id is same as number of the sent message
                 id = numberOfSentMessages + 1;
                 out.writeInt(id);
                 messageIds.add(id);



                 //get system timestamp
                 currentTimestamp = System.currentTimeMillis();
                 out.writeLong(currentTimestamp);

                 //fill in the rest-
                 byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];     //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes)
                 out.write(rest);

                 out.flush();
            }
             numberOfSentMessages++;

             long catcherTimestamp = 0L;
             Integer catcherMessageSize;
             Integer catcherId;
             synchronized(in){
                 catcherMessageSize = in.readInt();
                 catcherId = in.readInt();
                 catcherTimestamp = in.readLong();
             }
             System.out.println("Sent message:     " + size + " " + id + " " + currentTimestamp + "...");
             System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "...");
             System.out.println();
             Thread.sleep(1000);

          }catch(IOException e)
          {
             e.printStackTrace();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

}
  • Thank you very much! I'll try to implement this code tomorrow and let you know if everything is fine. Can you please tell me why are you using thread.sleep(X)? I'm new to threads. – asdf Jul 09 '16 at 22:06
  • Yeah, I'm using sleep to allow other threads run. Basically, when you call the sleep method it allows the scheduler to allow other threads to run. – CodeJockNYC Jul 09 '16 at 22:43
  • I'm having another problem. When pitcher sends message, catchers System.currentTimeMillis(); is smaller than pitchers. – asdf Jul 10 '16 at 11:49
  • Are the Catcher and Pitcher installed on two different machines? If so and there both running *nix, open two terminals and type 'date' into the terminal window. The time date might be off on both those machines. I believe you should be able to use some external time sync service to get the proper time. This is assuming you have admin access to both of your test boxes. If this is the issue and you have admin rights check out this [link](http://unix.stackexchange.com/questions/91792/what-is-the-recommended-way-to-synchronize-time-using-ntp). Good Luck! – CodeJockNYC Jul 10 '16 at 12:43
-3

The Java Socket class is not thread safe. To have multiple threads access the same Socket object you would need to synchronize their actions. This could be done by providing all your SendMessage-threads with a common object that would then act as a lock. You would need an object for each socket operation you are planning to use (e.g. read and write). Then, refactor every action that does a call to the Socket object into separate methods and synchronize them around that object. E.g. for the read operation you could have a method called read() inside SendMessage that calls Socket.read and synchronize this method around the lock-object for read.

private class SendMessage extends TimerTask{

    private Object readLock; 
    private Socket socket;

    public SendMessage(Object readLock, Socket socket) {
        this.readLock = readLock;
        this.socket = socket;
    }

    public void readFromSocket() {
         synchronized(readLock) {
              socket.read();
         }
    }

    @Override  
    public void run() {
        readFromSocket();
        // do other stuff
    }

}
Hovercraft Full Of Eels
  • 283,665
  • 25
  • 256
  • 373
Soggiorno
  • 760
  • 9
  • 17
  • This does not explain the cause of his problem. – Hovercraft Full Of Eels Jul 09 '16 at 17:25
  • The problem only arises when he uses the same socket object in multiple threads. Thus, it is most likely due to socket not being thread safe. Please elaborate on why you don't think that is the cause of the problem. – Soggiorno Jul 09 '16 at 17:28
  • Relevant: http://stackoverflow.com/questions/13545578/is-java-socket-multi-thread-safe – Soggiorno Jul 09 '16 at 17:29
  • But a threading error wouldn't cause the misbehavior that he's seeing. – Hovercraft Full Of Eels Jul 09 '16 at 17:30
  • CodeJockNYC's solution is closer on the money as it avoids the re-creation of the socket which the OP is doing in his code. – Hovercraft Full Of Eels Jul 09 '16 at 17:32
  • So does mine. Any solution that passes the socket as parameter in SendMessage does that. OP tried this himself and it didn't work. That's because the socket object gets used by multiple SendMessage threads this way and since socket is not thread safe, the error occurs. To properly utilize the same single socket for all SendMessage threads you need to synchronize the operations of that object. – Soggiorno Jul 09 '16 at 17:36
  • 1
    There is no code in the question that uses the same socket in two threads. – user207421 Jul 09 '16 at 17:47
  • When the socket is created outside SendMessage and accessed by every new instance of SendMessage, the socket is used across multiple threads. Every SendMessage object represents a thread. This is what OP does in line 7 of his code. – Soggiorno Jul 09 '16 at 17:52
  • The socket is used across multiple threads created successively by the Timer, *sequentially*. Not concurrently. And the statement that Java sockets are not thread-safe is simply false. – user207421 Jul 09 '16 at 18:00
  • While that might be true, OP specifically asked how to make his solution thread safe. I assume this is because he wants to spawn multiple SendMessage-threads eventually. That's why I addressed that, as well as the original problem of not creating a new socket for each thread. – Soggiorno Jul 09 '16 at 19:50