this is my code. It's supposed to implement a way of synchronizing calls in a distributed system. This is the part where i send the message and wait for acks. The network and the peers are supposed reliable.
My problem though is related to local sync, because it keeps raising this exception and i cannot understand why.
Exception in thread "Thread-7" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at distributed.PeerManager.sendAllWithAck(PeerManager.java:212)
at distributed.TokenManager.onTokenReceived(TokenManager.java:67)
at communication.TokenMessage.execute(TokenMessage.java:21)
at distributed.ListenThread.run(ListenThread.java:38)
Exception in thread "Thread-8" java.lang.IllegalMonitorStateException
at java.lang.Object.notify(Native Method)
at distributed.AckWaiter.run(AckWaiter.java:74)
Here's the AckWaiter class:
public class AckWaiter extends Thread {
public int counter;
PeerManager pm;
Message m;
public Object waiter;
public int parentPort;
public AckWaiter(PeerManager pm, Message m, int n,int parentPort) {
counter = n;
this.pm = pm;
this.m = m;
this.parentPort=parentPort;
waiter=new Object();
}
public synchronized void notifyAck() {
counter--;
notify();
}
@Override
public synchronized void run(){
pm.sendAllExceptMe(m);
try {
BufferedReader inFromClient=new BufferedReader(new
InputStreamReader(pm.listener.socketMap.get(parentPort).getInputStream()));
while(counter>0){
wait(100);
Message m=CustomMarshaller.getCustomMarshaller().unmarshal(inFromClient.readLine());
if(m==null){
continue;
}
m.execute(pm);
}
} catch (IOException | InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
waiter.notify();
return;
}
}
and here's the call supposed to block the thread
public synchronized void sendAllWithAck(Message m){
if(aw!=null){
throw new RuntimeException();
}
aw=new AckWaiter(this,m,connectionList.size()-1,m.sender.getPort());
aw.start();
try {
aw.waiter.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
aw=null;
}
'sendAllWithAck' should start a AckWaiter Thread, wait on the object 'waiter' that will be notified when i receive 'n' ACKs.
The 'notifyAck' method is called by the communication layer when an ACK message is received.
AckWaiter is a thread on his own because 'sendAllWithAck' could be called by a thread in charge of reading from a socket. I have n peers and every peer has an open socket (with a Thread handling inbound messages) for every other peer, so if i wait for ACKs in response to a message received, i wouldn't be able to read the ACK from that one peer (parentPort is the identifier of that peer so i can periodically check for ACKs inside this thread).
I'm open to modify my architecture if this exception is due to a structural problem but i have no idea to handle it differently without modify my communication layer, and that would be a pain.