1

I have a server that receives various xml messages from clients (one thread per client) and routes the messages to different functions depending on the message type. Eg. if the first element in the messages contains the string 'login' it signifies that this is a login message so route the message to the login() function.

Anyway, I want to make this message so things don't get messed up if multiple clients are connected and the dispatcher switches threads in middle of the message routing. So here is how I am routing the messages -

public void processMessagesFromClient(Client client)
{
    Document message;

    while (true)
    {
        try
        {
            message = client.inputStream.readObject();

            /*
             * Determine the message type
             */
            String messageType = getMessageType(message);

            // Route the message depending on its type
            switch (messageType)
            {
                case LOGIN:
                    userModel.handleLogin();
                ...
                ...
                ...
                etc...
             }
        } catch(Exception e) {}
   }

So how can I make this thread safe? I figure I need to put a synchronise statement in somewhere but Im not sure where. Also Ive been reading around on the subject and I found this post which says there is an issue with using synchronise on 'this' - https://stackoverflow.com/a/416198/1088617

And another post here which says singletons aren't suitable for using synchronise on (My class in the code above is a singleton) - https://stackoverflow.com/a/416202/1088617

Community
  • 1
  • 1
Jim_CS
  • 4,082
  • 13
  • 44
  • 80
  • Where are you starting the new thread? – GavinCattell Apr 01 '12 at 19:24
  • Well that function above is used by all clients. I start a thread for each client and after first retrieve the input/outputstreams, I the processMessagesFromClient() method above which loops infintely. So all clients will be sharing the 'message' variable. They will also all be sharing the userModel as that is also a singleton. When I receive a message from a client I presume I should be blocking other threads until Ive handled that message? – Jim_CS Apr 01 '12 at 19:37

7 Answers7

2

I would actually have a message handler thread which is responsible for reading incoming messages. This will then hand off processing to a worker thread to do the time consuming processing of the message. You can use the Java ThreadPoolExecutor to manage this.

Andrew Fielden
  • 3,751
  • 3
  • 31
  • 47
2

Your class is already thread safe, because you are only using local variables. Thread safety only comes into play when you access class state (ie fields), which your code doesn't (seem to) do.

What you are talking about is serialization - you want to funnel all message processing through one point to guarantee that message processing is one-at-a-time (starts and finishes atomically). The solution is simple: Employ a static synchronized method:

public void processMessagesFromClient(Client client) {
    Document Message;

    while (true) {
        processMessage(client);
    }
}

private static synchronized processMessage(Client client) {
    try {
        message = client.inputStream.readObject();

        String messageType = getMessageType(message);

        // Route the message depending on its type
        switch (messageType) {
            case LOGIN:
                userModel.handleLogin();
            ...
            etc...
        }
    } catch(Exception e) {}
}

FYI static synchronized methods use the Class object as the lock. This code will make your code behave like a single thread, which your question seems to want.

Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • Cheers mate, I had forgotten that local variables are thread safe. – Jim_CS Apr 01 '12 at 19:49
  • 2
    Better be careful when using this code - what do you expect from "message = client.inputStream.readObject()"? It may be blocking call - it will wait until current cline sends the message. In this case, processing of all clients will stop if server is waiting for message of one particular client. Brian_CS : could you clarify if "readObject()" waits for the message? – Arnost Valicek Apr 01 '12 at 20:01
  • Yes it waits for the message, inputStream is a ObjectInputStream. – Jim_CS Apr 02 '12 at 11:37
1

Provided you have one of these objects per thread, you don't have a problem. You only need to synchronized a shared object which can be modified by one of the threads.

public void processMessagesFromClient(Client client) {    
    while (true) {
        processMessage(client);
    }
}

private void processMessage(Client client) {
    try {
        Document message = client.inputStream.readObject();

        String messageType = getMessageType(message);

        // Route the message depending on its type
        switch (messageType) {
            case LOGIN:
                userModel.handleLogin();
            ...
            etc...
        }
    } catch(Exception e) {}
}
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • But he said that the class in the code was a singleton. So presumably he's got some data structures that are shared between threads. – Andrew Fielden Apr 01 '12 at 19:30
  • Well if the code above is executing and the 'message' variable gets set for a particular client (Client 1)...then if the dispatcher interrupts and lets a thread for another client (Client 2) run can't the 'message' variable get overwritten by this new client? And when it switches back to Client 1 the 'message' variable could now contain data related to Client 2? – Jim_CS Apr 01 '12 at 19:32
  • You can't interrupt an ObjectInputStream. You have you use one thread per connection as you suggested which means you can't use a dispatcher which uses a shared thread for multiple connections. Can you clarify what you doing? – Peter Lawrey Apr 01 '12 at 19:36
  • By dispatcher I dont mean some Java related Dispatcher object, I mean the operating system or JVM could interrupt one client thread and start executing another one right in the middle of my message routing. If Client 1's thread is executing and the 'message' variable gets set, the Client 1 thread could be interrupted and Client 2's thread would then overwrite the data in the 'message' variable so when Client 1's thread starts executing again it will contain invalid data... – Jim_CS Apr 01 '12 at 19:41
  • 1
    Your method is synchronized so only one thread could enter it. I wouldn't do it that way, instead I would have one `message` per thread because there is no need to have this problem in the first place. – Peter Lawrey Apr 01 '12 at 19:46
1

If you already have 1 thread per connection, then the only thing that you have to synchronize are the functions which handle the events (i.e. functions like userModel.handleLogin()).

Vincent Cantin
  • 16,192
  • 2
  • 35
  • 57
1

I guess the best solution should be to use a thread safe queue like the ConcurrentQueue and use a single working thread to pick up this values and run the actions one by one.

0

You need to know which resource should be only used be one thread at a certain time.

In your case it is likely that reading the next message needs to protected.

 synchronize (lock) {
      message = client.inputStream.readObject();
 }

However, your code sample does not really show what needs to protected against concurrent access

stefan bachert
  • 9,413
  • 4
  • 33
  • 40
  • Yes I believe 'message' needs to be protected. And the userModel also needs to be protected as it will be loading data related to the current client. I just want to block all other client threads from executing until I have handled the current message. So would it be ok to wrap everything in the 'try' statement in a synchronise block? – Jim_CS Apr 01 '12 at 19:46
  • @stefan bachert : As you said, there is not enough information... But assuming that inputStream is network stream and readObject() may block, I would say that it's not good idea to put this call into synchronized block. If one of threads is blocking on "readObject()", all other threads can't read any already available messages. Do you think this may be an issue? – Arnost Valicek Apr 01 '12 at 20:15
  • synchronizing anything which is already synchronized could lead to a dead lock. I see your point. – stefan bachert Apr 01 '12 at 20:33
0

The method itself is thread safe. However, noting that this your class is a singleton, you might want to use double checked locking in your getInstance to ensure thread safety. Also you should make sure your instance is set to static

      class Foo {
            private static volatile Foo instance = null;
            public static Foo getInstance() {
                if (instance == null) 
                {
                    synchronized(this) 
                    {
                        if (instance == null)
                            instance = new Foo ();
                    }
                }
                return instance ;
            }
        }
SiN
  • 3,704
  • 2
  • 31
  • 36