0

I have n number of worker threads that retrieve records from a kinesis stream (this is not important for this problem), which are then pushed on to an executor service where the records are processed and persisted to a backend database. This same executor service instance is used for all worker threads.

Now there is a scenario where any given worker loop stops processing records and blocks until all records that were submitted by it are processed completely. This essentially means that there should be no pending/running threads in the executor service for a record from that particular worker thread.

A very trivial example of the implementation is like this:

  1. Worker class

    public class Worker {
    
    Worker(Listener listener){
        this.listener = listener;
    }
    
    //called periodically to fetch records from a kinesis stream
    public void processRecords(Record records) {
    
        for (Record record : records) {
            listener.handleRecord(record);
        }
    
        //if 15 minutes has elapsed, run below code. This is blocking.
        listener.blockTillAllRecordsAreProcessed()
    }
    

    }

  2. Listener class

    public class Listener {
    
        ExecutorService es;
    
        // same executor service is shared across all listeners.
        Listener(ExecutorService es){
            this.es = es;
        }
    
        public void handleRecord(Record record) {
            //submit record to es and return
            // non blocking
        }
    
        public boolean blockTillAllRecordsAreProcessed(){
            // this should block until all records are processed
            // no clue how to implement this with a common es
        }
    
    }
    

The only approach I could think of is to have a local executor service for each worker and do something like invokeAll for each batch, which would be change the implementation slightly but get the job done. but I feel like there should be a better approach to tackle this problem.

Priyath Gregory
  • 927
  • 1
  • 11
  • 37

1 Answers1

1

You could use the CountdownLatch class to block as follows:

public void processRecords(List<Record> records) {
 CountDownLatch latch = new CountDownLatch(records.size());
 for (Record record : records) {
     listener.handleRecord(record, latch);
 }

 //if 15 minutes has elapsed, run below code. This is blocking.
 listener.blockTillAllRecordsAreProcessed(latch)
 } 

public class Listener {
 ExecutorService es;
 ...
 public void handleRecord(Record record, CountDownLatch latch) {
     //submit record to es and return
     // non blocking
     es.submit(()->{
        someSyncTask(record);
        latch.countDown();
        
    })
 }

 public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
     System.out.println("waiting for processes to complete....");
     try {
          //current thread will get notified if all chidren's are done 
          // and thread will resume from wait() mode.
          latch.await();
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
       }

   }

Read more here: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

Nikhil Joshi
  • 97
  • 1
  • 6
  • Thank you for the response! A potential problem I see with this solution is, since `blockTillAllRecordsAreProcessed` is only called every 15 minutes, there will be multiple invokations of processRecords during that time span. This would in turn submit multiple batches to the executor service. But we would only have a reference to the latest latch object when the blocking code is finally called. – Priyath Gregory Oct 02 '20 at 17:11
  • However, your answer has definitely given some good insight. https://stackoverflow.com/questions/1636194/flexible-countdownlatch – Priyath Gregory Oct 02 '20 at 17:34
  • @fsociety Yeah, I had the same doubts too. However you could maybe use an arraylist to track all the latches and block all of them in a for loop or something similar. – Nikhil Joshi Oct 02 '20 at 18:26