1

Requirement: I have a batch job that is processing 1 million records. I am storing the 1 million records in an Arraylist and iterating it to make a 3rd party external call for each record. Now the requirement is the third party will send a HTTP response 200 or 400 or 500. In the only case the response is 500, I have to update the database for that specific record.

Problem: To speed up processing I am trying to implement threading for the 3rd party call. But I am stuck with the fact that after implementing threading how can I process the response from the 3rd party call to make the database update. I don't want to include the DB update inside the thread because in case there are multiple threads trying to update the DB, there will be a DB deadlock.

My Effort: What I was trying is to declare a singleton arraylist and store the record number for which the response from the 3rd party call is 500 in the singleton object. When all the 3rd party call is complete I would iterate that singleton arraylist to fetch the records and update in the DB.

RoadBlock: Even in this case I am unable to figure out how can I make the threading sequential so that I can store the record in the singleton arraylist.

Code:

class callExtPrty implements Runnable{

public callExtPrty(String recordNumber)
     this.recordNumber = recordNumber;

public void run(){

     int response = externalCall(String recordNumber);
     if response == 500
          singletonList.add(recordNumber);

}

class recordProcessorDAO{

    public void processRecords(){

    List<String> dbRecordList= new ArrayList<String>();

    //DB call to add 1 million records to dbRecordList

    Iterator<String> recordList = dbRecordList.iterator();
    while (recordList.hasNext()) {

    new callExtPrty(recordList.next());

    }

    //Getting the singleton list populated by the 3rd party call
    Iterator<String> singletonList = singletonList.iterator();
    while (singletonList .hasNext()) {

        //DB call to update the record fetched from singletonList 

    }
}

Can anyone help me in getting this designed in the proper way. Threading needs to be implemented for performance improvement as the job processes 1 million records in one go and the job runs for around 12-13 hours.

Thanks

Nirmalya
  • 195
  • 4
  • 16

4 Answers4

0

You should make the HTTP calls in a multi-threaded way, as you have already done. Instead of Making it a runnable, you could use ExecutorService for that. It's just easier to maintain code that way.

As far as the Database update is concerned, you should batch those updates and apply them in one shot, try to make the query like this: UDDATE Table SET Column=Value WHERE KEY IN(a,b,c,d). Index the key if it's not already indexed.

As of now, these values are stored in memory, if you don't want to keep it in memory to make it failsafe, and re-runnable, you could use some external caches like Redis which stores HTTP request:responses as key value and you can look it up instead of making HTTP call in-case your code breaks/system crashes, and you have to re run the whole thing.

Batching Logic: Let's say you get X number HTTP Responses, out of which Y are HTTP:500. Now you update the database for every Y=1000 lets say. This would reduce the number of Database queries you are firing significantly.

This would be done in one main thread which receives callbacks of the other threads handling HTTP calls. So there is no chance of multiple threads writing to DB.

One more suggestion, use connection pooling, and you could cache the results of HTTP calls locally, since you talked about list data structure which could have duplicates, you'll end up saving some HTTP calls.

Sandeep Kaul
  • 2,957
  • 2
  • 20
  • 36
0

You just need to

  • Divide work for each thread, so that no two threads share the same work

  • Wait for ALL threads to finish, and then one of the threads should solved the problem.

  • Do not forget to notify other threads that the problem solved, so they stop searching.

Example:

private CopyOnWriteArrayList list;

private class Shared<T> {
   private T data;
   public synchronized T getData() { return data; }
   public synchronized void setData(T data) { this.data = data; }
}

public boolean multiThreadedSearch(final int value) {
   int numThreads = 4;
   int threadWork = list.size() / numThreads;
   final Shared<Boolean> found = new Shared<>();
   found.setData(false);
   Thread[] threads = new Thread[numThreads];
   for (int i = 0; i < numThreads; ++i) {
      final int myStart = i * threadWork;
      final int myEnd = i == numThreads - 1 ?
            list.size() : (i + 1) * threadWork;
      threads[i] = new Thread(new Runnable() {
         public void run() {
            for (int k = myStart; k < myEnd && !found.getData(); ++k) {
                if (list.get(k) == value) {
                   found.setData(true);
                }
            }
         }
      });
   }
   for (Thread t : threads) t.start();
   //now wait them to finish
   for (Thread t : threads) {
      try {
        t.join();
      } catch (InterruptedException ex) {
      }
   }
   return found.getData();
}

You can call multiThreadedSearch in a seperate thread or on the main thread.

0

You don't need to put everything in an arraylist and you can make the multithreading much simpler as well.

Basic strategy: You are going to spend a big chunk of time reading from the database connection, a bunch of time waiting for each request to come back and then a bunch of time doing database calls for the HTTP 500 responses. So the best way to split this up is:

Make a ThreadPoolExecutor with a bunch of threads (fiddle with it to find the right size, I'd start off around 8 max worker threads), caller runs policy and a SynchonousQueue to feed it. Nothing complicated here.

Run your initial query. As rows come in, call execute(), passing it a Runnable() that does the following for each database row in the query results:

1) do the HTTP request with the data from the database record

2) look at the result

3) do a database call to update the record if necessary. This whole part should be within a synchronized block based on something simple and unique like DB record ID. This way you don't get two threads updating the same db record at the same time.

And you're done.

ThreadPoolExecutor has afterExecute() to handle errors or you can do a try catch in the run() method which is even easier.

Jim W
  • 492
  • 2
  • 11
0

You have to use Callback mechanism with FutureTask.

Solution to your problem:

  1. Create newWorkStealingPool from Executors or use ForkJoinPool with number of CPU cores as size.

  2. In your Callable or Runnable task, Add business logic with Callback class.

  3. When you get error from 3rd party API, call Callback class method.

Related SE questions:

Executing Java callback on a new thread

Java executors: how to be notified, without blocking, when a task completes?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211