-2

this is my code. It's supposed to implement a way of synchronizing calls in a distributed system. This is the part where i send the message and wait for acks. The network and the peers are supposed reliable.

My problem though is related to local sync, because it keeps raising this exception and i cannot understand why.

Exception in thread "Thread-7" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at distributed.PeerManager.sendAllWithAck(PeerManager.java:212)
at distributed.TokenManager.onTokenReceived(TokenManager.java:67)
at communication.TokenMessage.execute(TokenMessage.java:21)
at distributed.ListenThread.run(ListenThread.java:38)
Exception in thread "Thread-8" java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)
    at distributed.AckWaiter.run(AckWaiter.java:74)

Here's the AckWaiter class:

public class AckWaiter extends Thread {
    public int counter;
    PeerManager pm;
    Message m;
    public Object waiter;
    public int parentPort;

    public AckWaiter(PeerManager pm, Message m, int n,int parentPort) {
        counter = n;
        this.pm = pm;
        this.m = m;
        this.parentPort=parentPort;
        waiter=new Object();
    }

    public synchronized void notifyAck() {
        counter--;
            notify();

    }

    @Override
    public synchronized void run(){
        pm.sendAllExceptMe(m);

        try {
            BufferedReader inFromClient=new BufferedReader(new
                    InputStreamReader(pm.listener.socketMap.get(parentPort).getInputStream()));
            while(counter>0){
                    wait(100);
                    Message m=CustomMarshaller.getCustomMarshaller().unmarshal(inFromClient.readLine());
                    if(m==null){
                        continue;
                    }
                    m.execute(pm);
                }
                } catch (IOException | InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } 


        waiter.notify();
        return;
    }
}

and here's the call supposed to block the thread

public synchronized void sendAllWithAck(Message m){
    if(aw!=null){
        throw new RuntimeException();
    }
    aw=new AckWaiter(this,m,connectionList.size()-1,m.sender.getPort());
    aw.start();
    try {
        aw.waiter.wait();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    aw=null;
}

'sendAllWithAck' should start a AckWaiter Thread, wait on the object 'waiter' that will be notified when i receive 'n' ACKs.

The 'notifyAck' method is called by the communication layer when an ACK message is received.

AckWaiter is a thread on his own because 'sendAllWithAck' could be called by a thread in charge of reading from a socket. I have n peers and every peer has an open socket (with a Thread handling inbound messages) for every other peer, so if i wait for ACKs in response to a message received, i wouldn't be able to read the ACK from that one peer (parentPort is the identifier of that peer so i can periodically check for ACKs inside this thread).

I'm open to modify my architecture if this exception is due to a structural problem but i have no idea to handle it differently without modify my communication layer, and that would be a pain.

Chobeat
  • 3,445
  • 6
  • 41
  • 59

1 Answers1

1

Methods wait(), notify() and notifyAll() must be in synchronized block. And run method is only accessed by one thread (so it doesn't need to be synchronized) and exception was in ListenThread.

Vladimir Vagaytsev
  • 2,871
  • 9
  • 33
  • 36
Dejan
  • 3,046
  • 3
  • 28
  • 43
  • they are all in synchronized methods – Chobeat May 26 '13 at 11:27
  • 1
    You don't synchronize von your `waiter` while notify/wait. You synchronize on your object which is what the `synchonized` keyword means. – Thomas Jungblut May 26 '13 at 11:41
  • I know how synchronization work but still can't understand why does this matter. I have to wait on 2 different objects because one is "wait until an ack is received so that i can evaluate how many acks are left" and the other, when i wait on 'waiter', means "wait untl i've received all the acks" – Chobeat May 26 '13 at 11:53
  • Just check on which object is synchronization and change if needed to one on which all synchronization is done. Use debugger if stackTrace is not understandable. – Dejan May 26 '13 at 12:03
  • You're not helping because you're not explaining the reason why the exception is raised. You're just throwing statements that i can't connect to my problem. – Chobeat May 26 '13 at 12:05
  • 1
    @Chobeat you don't know how it works. Get yourself a good book (Concurrency in practice is the best read imho) and you may be interested in http://stackoverflow.com/questions/2779484/why-wait-should-always-be-in-synchronized-block for your specific question. – Thomas Jungblut May 26 '13 at 12:08
  • I know how it works. I studied how it works at uni, i studied how it works on the Oracle docs and i've used object sync like a thousand time in Java. Still i can't understand what that dude is saying. – Chobeat May 26 '13 at 12:10
  • I'm gonna check that post anyway, maybe it will help – Chobeat May 26 '13 at 12:10
  • Ok got what you were saying. Now the exception is not raised anymore but still not working. I need some time to check. Thanks for now – Chobeat May 26 '13 at 12:13