0

Here I have two run methods which should synchronize each other.

Poller Class:

     */
public void run() {
    int seqId = 0;
    while(true) {
    List<KpiMessage> list = null;

        try{
            if(!accumulator.isUsed){                
                try {
                    list = fullPoll(seqId);

                    if (!list.isEmpty()) {
                        seqId = list.get(0).getSequence();
                        accumulator.manageIngoing(list);
                    }
                    System.out.println("Updated");                      
                    wait(); 
                } catch (Exception e1) {
                    e1.printStackTrace();

                }
            }

        } catch (Exception e){
            // TODO:
            System.err.println(e.getMessage());
            e.printStackTrace();                
        }
    }

}


/**
 * Method which defines polling of the database and also count the number of Queries
 * @param lastSeq 
 * @return pojo col
 * @throws Exception
 */
public List<KpiMessage> fullPoll(int lastSeq) throws Exception {
    Statement st = dbConnection.createStatement();
    System.out.println("Polling");
    ResultSet rs = st.executeQuery("Select * from msg_new_to_bde where ACTION = 814 and 
    STATUS = 200 order by SEQ DESC");
    List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
    try {


        while (rs.next()) {
            KpiMessage filedClass = convertRecordsetToPojo(rs);
            pojoCol.add(filedClass);
        }

        for (KpiMessage pojoClass : pojoCol) {
            System.out.print(" " + pojoClass.getSequence());
            System.out.print(" " + pojoClass.getTableName());
            System.out.print(" " + pojoClass.getAction());
            System.out.print(" " + pojoClass.getKeyInfo1());
            System.out.print(" " + pojoClass.getKeyInfo2());
            System.out.print(" "+ pojoClass.getStatus());
            System.out.println(" " + pojoClass.getEntryTime());

        }


    } finally  {
        try {
            st.close();
            rs.close();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }       

Processing and Updating Class:

        public void run() {
    while(true){
        try {
            while(!accumulator.isUsed)
            {
                try {
                System.out.println("Waiting for new outgoingmessages"); 
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
     Collection<KpiMessage> outgoingQueue = generate(accumulator.outgoingQueue); 
            accumulator.manageOutgoing(outgoingQueue, dbConnection);
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
}
}   

I have a logical error:

The poller is polling not only for new messsage but also reads the DB again and again from the first.

Also Updates again and again.

How to solve this synchronization problem.

Babu
  • 299
  • 1
  • 3
  • 12

2 Answers2

1

You should synchronize or rather hold the lock or monitor for the object that you are calling wait() or notify() on.

Here is what will help you : wait() throwing IllegalArgumentException

synchronized(lockObject){

     lockObject.wait(); //you should hold the lock to be able to call wait()
}
Community
  • 1
  • 1
Narendra Pathai
  • 41,187
  • 18
  • 82
  • 120
  • Thank you ,is it like I add Synchronized(lockObject) instead of normal wait. – Babu Feb 19 '13 at 09:33
  • Is the name of lockOject correspond to the name of thread for first run method. – Babu Feb 19 '13 at 09:36
  • No. you will have a single object that will be shared among all the threads and all will lock the object and call wait on that object. And one thread will lock and call notify on the same object. The name of lock has nothing to do with the name of the thread. You should read basic threading tutorials. – Narendra Pathai Feb 19 '13 at 14:07
1

Alternatively you could use a BlockingQueue to transfer the data between threads.

See BlockingQueue for details.

// The end of the list.
private static final Integer End = -1;

static class Producer implements Runnable {
  final Queue<Integer> queue;
  private int i = 0;

  public Producer(Queue<Integer> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      for (int i = 0; i < 1000; i++) {
        queue.add(i++);
        Thread.sleep(1);
      }
      // Finish the queue.
      queue.add(End);
    } catch (InterruptedException ex) {
      // Just exit.
    }
  }
}

static class Consumer implements Runnable {
  final Queue<Integer> queue;
  private int i = 0;

  public Consumer(Queue<Integer> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    boolean ended = false;
    while (!ended) {
      Integer i = queue.poll();
      if ( i != null ) {
        ended = i == End;
        System.out.println(i);
      }
    }
  }
}

public void test() throws InterruptedException {
  Queue queue = new LinkedBlockingQueue();
  Producer p = new Producer(queue);
  Consumer c = new Consumer(queue);
  Thread pt = new Thread(p);
  Thread ct = new Thread(c);
  // Start it all going.
  pt.start();
  ct.start();
  // Close it down
  pt.join();
  ct.join();
}
OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213
  • Thank you, Is this like fork join pool, because here am using Thread pool.. and my code keeps on updating again and again, and the poller not only reads the new message but also polls from the first... – Babu Feb 19 '13 at 09:32
  • 1
    `ForkJoinPool` is a feature of Java 7. This code does not make use of it. All this code is doing is demonstrating how simple it is to use a `BlockingQueue` to communicate between two threads. Using `wait/notify` is not just far more complex it is a minefield of caveats and dangers. – OldCurmudgeon Feb 19 '13 at 09:41
  • " Wait/notify is minefield" - truely said, I will try implementing Blocking queue and return back thank you... – Babu Feb 19 '13 at 09:47
  • 1
    In http://stackoverflow.com/questions/14772236/illegal-monitor-state-exception some people told you how to avoid IllegalMonitorStateExceptions. Maybe you should look at the higher-level classes in java.util.concurrent, too. BlockingQueues are already implemented there. – Ralf H Feb 19 '13 at 10:14
  • @RalfH, I got cleared with the IllegalMonitorStateExceptions, but rite now in my code am having a logical error which my code reapeatedly polls fro the first instead of polling only new message also repeatedly updates.... – Babu Feb 19 '13 at 10:30
  • @RalfH some people answered what the question meant to ask and nothing more than that. – Narendra Pathai Feb 19 '13 at 14:09
  • @NarendraPathai That’s fine. For some of us, english is not their first language. So there can always be misunderstanding, therefore I asked for some clarity for me. Seems it was clear enough for you already :) – Ralf H Feb 19 '13 at 14:20