2

I have a list of files and a list of analyzers that analyze those files. Number of files can be large (200,000) and number of analyzers (1000). So total number of operations can be really large (200,000,000). Now, I need to apply multithreading to speed things up. I followed this approach:

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (File file : listOfFiles) {
  for (Analyzer analyzer : listOfAnalyzers){
    executor.execute(() -> {
      boolean exists = file.exists();
      if(exists){
        analyzer.analyze(file);
      }
    });
  }
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

But the problem of this approach is that it's taking too much from memory and I guess there is better way to do it. I'm still beginner at java and multithreading.

Anthony J.
  • 375
  • 1
  • 5
  • 14
  • You’re only running a handful of threads in your thread pool, since `availableProcessors()` would return at most a few hundred on even a high end server. So the memory problem isn’t due to your use of multithreading, it’s due to your `analyze(File)` method. Change that method to use less memory. Are you reading the entire content of each file into memory, perchance? – VGR Jun 28 '18 at 15:14
  • 1
    The `if(file.exists()) { analyze(file) }` looks suspicious to me. If something could have caused the file to cease existing while the `File` object was waiting in the `listOfFiles`, then what (apart from sheer luck) prevents the same whateveritis from causing the file to cease its existence after the `file.exists()` test returns `true`, but before `analyze(file)` gets a chance to open it? A better strategy might be to just go ahead and call `analyze(file)` without the existence test, and then catch the exception that `analyze()` probably would throw if the file isn't there. – Solomon Slow Jun 28 '18 at 15:55

2 Answers2

5

Where are 200M tasks going to reside? Not in memory, I hope, unless you plan to implement your solution in a distributed fashion. In meantime, you need to instantiate an ExecutorService that does not accumulate a massive queue. Use with the "caller runs policy" (see here) when you create the service. If you try to put another task in the queue when it's already full, you'll end up executing it yourself, which is probably what you want.

OTOH, now that I look at your question more conscientiously, why not analyze a single file concurrently? Then the queue is never larger than the number of analyzers. That's what I'd do, frankly, since I'd like a readable log that has a message for each file as I load it, in the correct order.

I apologize for not being more helpful:

analysts.stream().map(analyst -> executor.submit(() -> analyst.analyze(file))).map(Future::get);

Basically, create bunch of futures for a single file, then wait for all of them before you move on.

AbuNassar
  • 1,128
  • 12
  • 12
  • Thanks for the "CallerRunsPolicy", I will take a look at it. "Where are 200M tasks going to reside?" analyze method is initializing many local variables, so my understanding, when "executors.execute()" is called, all those local variables will initialize, no?.... as for second edited part "analyze a single file concurrently?", am I not doing so already?. also it would be so helpful to provide even pseudo code – Anthony J. Jun 28 '18 at 14:03
  • 1
    No, for every file, you're queuing up 1000 analysis tasks, then immediately moving on to the next file. That's why your queue is blowing up. One way to do this is, for every file, turn `List` into a stream (see https://stackoverflow.com/a/43226982/476942). In pseudo-code: – AbuNassar Jun 28 '18 at 14:27
  • 1
    This is _not_ how I'd recommend doing it; Java has much easier ways of leveraging concurrently nowadays. But if you simply want to fix the memory overrun, simply make sure that your `ExecutorService` isn't queueing up millions of tasks: `LinkedBlockingQueue queue = new LinkedBlockingQueue<>(100); RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 8, 1, TimeUnit.SECONDS, queue, handler);` – AbuNassar Aug 14 '18 at 17:46
  • @AbuNassar Can I ask, will the CallerRunsPolicy be executed sequentially? or it will immediately be executed given the task is rejected (even though the earlier tasks are all still being queued) – experiment unit 1998X Feb 07 '23 at 08:41
  • I honestly don't know. I would _assume_ that there are absolutely no guarantees in such a case. A task that has to run in the calling thread shouldn't get priority over tasks already in (full) queue. Or _vice versa_, really. – AbuNassar Feb 07 '23 at 19:34
2

One idea is to employ fork/join algorithm and group items (files) into batches in order to process them individually.

My suggestion is the following:

  1. Firstly, filter out all files that do not exist - they occupy resources unnecessarily.
  2. The following pseudo-code demonstrates the algorithm that might help you out:

    public static class CustomRecursiveTask extends RecursiveTask<Integer {
    
    private final Analyzer[] analyzers;
    
    private final int threshold;
    
    private final File[] files;
    
    private final int start;
    
    private final int end;
    
    public CustomRecursiveTask(Analyzer[] analyzers,
                               final int threshold,
                               File[] files,
                               int start,
                               int end) {
        this.analyzers = analyzers;
        this.threshold = threshold;
        this.files = files;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        final int filesProcessed = end - start;
        if (filesProcessed < threshold) {
            return processSequentially();
        } else {
            final int middle = (start + end) / 2;
            final int analyzersCount = analyzers.length;
    
            final ForkJoinTask<Integer> left =
                    new CustomRecursiveTask(analyzers, threshold, files, start, middle);
            final ForkJoinTask<Integer> right =
                    new CustomRecursiveTask(analyzers, threshold, files, middle + 1, end);
            left.fork();
            right.fork();
    
            return left.join() + right.join();
        }
    }
    
    private Integer processSequentially() {
        for (int i = start; i < end; i++) {
            File file = files[i];   
            for(Analyzer analyzer : analyzers) { analyzer.analyze(file) };
        }
    
        return 1;
    }
    }
    

And the usage looks the following way:

 public static void main(String[] args) {
    final Analyzer[] analyzers = new Analyzer[]{};
    final File[] files = new File[] {};

    final int threshold = files.length / 5;

    ForkJoinPool.commonPool().execute(
            new CustomRecursiveTask(
                    analyzers,
                    threshold,
                    files,
                    0,
                    files.length
            )
    );
}

Notice that depending on constraints you can manipulate the task's constructor arguments so that the algorithm will adjust to the amount of files.

You could specify different thresholds let's say depending on the amount of files.

final int threshold;
if(files.length > 100_000) {
   threshold = files.length / 4;
} else {
   threshold = files.length / 8;
}

You could also specify the amount of worker threads in ForkJoinPool depending on the input amount.

Measure, adjust, modify, you will solve the problem eventually.

Hope that helps.

UPDATE:

If the result analysis is of no interest, you could replace the RecursiveTask with RecursiveAction. The pseudo-code adds auto-boxing overhead in between.

dawid gdanski
  • 2,432
  • 3
  • 21
  • 29
  • The `ForkJoinPool` is recommended for "divide & conquer" problems, which the OP is not. There's nothing recursive about it, as far as we know, unless analyses are being recursively combined. – AbuNassar Jun 28 '18 at 17:36
  • Correct me if I am wrong but the goal here is to analyze every file with all analyzers available so division and conquest is one of the acceptable approaches that works in a "distributed fashion" just like you recommended. What's more is the fork/join proposed is more flexible in terms of preserving performance/memory than what was originally suggested. – dawid gdanski Jun 29 '18 at 07:15
  • thanks for this.... I have to take a deeper look at this fork/join algorithm as I barely scratched the surface of multithreading in java. – Anthony J. Jun 30 '18 at 15:26