0

My ultimate goal is to achieve batch processing of lines in an Executor Service. I have the following snippet of code:

while ((bLine = bufferedReader.readLine()) != null) {
    // We will use an array to hold 100 lines, so that we can batch process in a
    // single thread
    batchArray.add(bLine);
    switch (batchArray.size()) {
        case 100:
            Future<?> future = executor.submit(new LocalThreadPoolExecutor(batchArray, closeableHttpClient, httpPost));
            futures.add(future);
           // batchArray.clear() <--- point of failure
            break;
        default:
            logger.info("Batch size in switch: "+batchArray.size());

    }
}

Now if I do an batchArray.clear() in the case 100 I get a concurrentModificationException. Unable to determine how can I reinit the array list and send 100 lines to my executor as they are read from file.

below is the stacktrace:

java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
    at java.util.ArrayList$Itr.next(ArrayList.java:831)
    at consumer.ril.com.LocalThreadPoolExecutor.run(LocalThreadPoolExecutor.java:37)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

I get thie exception in my LocalThreadPoolExecutor class when I try to read the batchArray, which is passed in the constructor of this class.

User3
  • 2,465
  • 8
  • 41
  • 84

3 Answers3

1

Your object batchArray is passed by reference, see this for more details: Is Java "pass-by-reference" or "pass-by-value"?

As such your class LocalThreadPoolExecutor still has a reference to it and cannot be modified.

Either using a clone or making the parameter final should work.

Graham
  • 7,431
  • 18
  • 59
  • 84
A G
  • 61
  • 4
  • I used a clone and it works except for some warnings. – User3 May 30 '17 at 16:21
  • Looked at the implementation of `clone` and `Constructor(Collection)`, both seem to be intrinsically same. If you don't really need `batchArray` outside the loop, it might be better to implement as @nick-knysh has mentioned below. – A G May 30 '17 at 19:07
1

Simple solution -- you need pass to LocalThreadPoolExecutor copy of array and clean original array.

Future<?> future = executor.submit(new LocalThreadPoolExecutor(new ArrayList<>
(batchArray), closeableHttpClient, httpPost));
futures.add(future);
batchArray.clear();
1

Some code is using list (Iterator) inside of LocalThreadPoolExecutor. At some point it realizes that list has been modified (cleared). You should use another copy of the list.

Since you don't need items in main thread, you could just explicitly create new list for each batch and pass it to processor:

Something like:

{   
...
     while ((batch = getNextBatch(bufferedReader, 100)).size() > 0) {
        futures.add(
            executor.submit(new LocalThreadPoolExecutor(batch, closeableHttpClient, httpPost))
        );
    }
...
}

get next batch:

List<String> getNextBatch(BufferedReader bReader, int batchSize) throws IOException {
    List<String> batch = new ArrayList<>(batchSize);
    while (batch.size() < batchSize && (bLine = bReader.readLine()) != null) {
        batch.add(bLine);
    }
    return batch;
}
Nick Knysh
  • 11
  • 2