0

The Problem:

I am parsing a large log file (around 625_000_000 lines) and saving it into the database.

public class LogScheduler {

static int fileNumber = 1;

public Importer(IRequestService service) {
    this.service = service;
}

@Override
public  void run() {
    try {
        service.saveAll(getRequestListFromFile("segment_directory/Log_segment_"+fileNumber+".txt"));
        
    } catch (IOException e) {
        e.printStackTrace();
    }

   }
}

The method that runs this thread is:

 public void scheduledDataSave() throws InterruptedException {
    int availableCores = Runtime.getRuntime().availableProcessors();
    String directory = "segment_directory";

    int filesInDirectory = Objects.requireNonNull(new File(directory).list()).length;

    ExecutorService executorService = Executors.newFixedThreadPool(availableCores);

    for (int i = 1; i <= filesInDirectory; i++) {
        executorService.execute(new Importer(service));
    }
    executorService.shutdown();

    }

Inserting the Thread.sleep(); method after the executorService.execute(new Importer(service)); sleeps after the execution of every thread, and not 8 threads like it should since they are in the Executorservice And I have no idea why that happens since it should not behave like that. From what I understand, the ExecutorService should run 8 threads in parallel, finish them, sleep, and start the pool again.

How to "sleep" after every 8 threads?

kryzystof
  • 190
  • 2
  • 13
  • 2
    `The problem is the CPU spike` - why exactly? – Eugene Mar 19 '21 at 16:27
  • @Eugene I ment to say constant 100% CPU. I would like to find a way to reduce it, and incorporste sleep to do so – kryzystof Mar 19 '21 at 16:52
  • this might not be related to `ExecutorService`, but garbage collection threads that need to clean-up after all those files – Eugene Mar 19 '21 at 16:53
  • Your question is not clear. Rather than talking about how you modified the shown code, just show us the code you actually ran. And tell us your goal, what you are trying to achieve, as that is fuzzy and unclear. I’m 2nd to vote to close as unclear. If closed, contribute to edit your Question, as it can be reopened if the shortcomings are fixed. – Basil Bourque Mar 19 '21 at 18:07
  • @BasilBourque thanks for the pointers. Hope this is more clear – kryzystof Mar 19 '21 at 18:25
  • This code is still incomplete. (a) Your first code refers to `this.service` but does not show any such member field. (b) Your prose still talks about calling `Thread.sleep` but I see no such call. See: [*How to create a Minimal, Reproducible Example*](https://stackoverflow.com/help/minimal-reproducible-example) – Basil Bourque Mar 19 '21 at 21:07

2 Answers2

0

Sleeping the thread submitting tasks does not sleep the submitted tasks

Your question is not clear, but apparently centers around your expectation that adding a Thread.sleep after each call to executorService.execute would sleep all the threads of the executor service.

    for ( int i = 1 ; i <= filesInDirectory ; i++ ) {
        executorService.execute( new Importer( service ) );   // Executor service assigns this task to one of the background threads in its backing pool of threads.
        Thread.sleep( Duration.ofMillis( 100 ).toMillis() ) ; // Sleeping this thread doing the looping. *Not* sleeping the background threads managed by the executor service.
    }

Your expectation in incorrect.

That Thread.sleep is sleeping the thread doing the for loop.

The executor service has its own backing pool of threads. Those threads are not affected by a Thread.sleep is some other thread. Those background threads will only sleep if you call Thread.sleep within the code running on each of those threads.

So you are feeding the first task to the executor service. The executor service immediately dispatches that work to one of its backing threads. That task is immediately executed (if a thread is available immediately, and not otherwise occupied by previous tasks).

After assigning that task, your for loop sleeps for a hundred milliseconds, in this example code shown here. While the for loop is asleep, no further tasks are being assigned to the executor service. But while the for loop is asleep, the submitted task is executing on a background thread. That background thread is not sleeping.

Eventually, your for loop thread wakes, assigns a second task, and goes back to sleep. Meanwhile the background thread executes at full speed ahead.

So sleeping the thread submitting tasks does not sleep tasks already submitted.

Waiting for submitted tasks to complete

Your title asks:

ExecutorService should wait until batch of taksk is finished before starting again

After submitting your tasks, call shutdown and awaitTermination on your executor service. After those calls, your code blocks, waiting until all the submitted tasks are are completed/canceled/failed.

ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
… submit tasks to that executor service …
executorService.shutdown() ;
executorSerivce.awaitTermination() ;  // At this point, the flow-of-control blocks until the submitted tasks are done.
System.out.println( "INFO - Tasks on background threads are done. " + Instant.now() );

I would suggest using the ExecutorService#submit method rather than ExecutorService#execute method. The difference is that the first method returns a Future object. You can collect these Future objects as you submit tasks to the executor service. After the shutdown & awaitTermination, you can examine your collection of Future objects to check their completion status.

Project Loom

If Project Loom succeeds, such code will be a bit simpler and more clear. Experimental builds of Project Loom technology are available now, based on early-access Java 17. The Loom team seeks feedback now.

With Project Loom, ExecutorService becomes AutoCloseable. This means we can use try-with-resources syntax to automatically call a new close method on ExecutorService. This close method first blocks until all the tasks are completed/canceled/failed, then shuts down the executor service. No need to call shutdown nor awaitTermination.

By the way, Project Loom also bring virtual threads (fibers). This is likely to dramatically increase the performance of your code because it involves much blocking for storage i/o and database access.

try (
        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    … submit tasks to that executor service …
}
// At this point, with Project Loom technology, the flow-of-control blocks until the submitted tasks are done.
// Also, the `ExecutorService` is automatically closed/shutdown by this point, via try-with-resources syntax.
System.out.println( "INFO - Tasks on background threads are done. " + Instant.now() );

With Project Loom, you can collect the returned Future objects in the same manner as discussed above to examine completion status.


You have other issues in your code. But you've not disclosed enough to address them all.

Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154
0

How to "sleep" after every 8 threads?

So if you are doing something like this, then it isn't doing what you think.

for (int i = 1; i <= filesInDirectory; i++) {
    executorService.execute(new Importer(service));
    Thread.sleep(...);
}

This causes the thread which is starting the background jobs to sleep and does not affect the running on each of the jobs. I believe what you are missing is to wait for the thread-pool to finish:

 executorService.shutdown();
 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

This waits for all of the jobs in the thread-pool to complete before continuing.

One more thing. I use executorService.submit(...) versus execute(...). Here's a description of their difference. For me, one additional difference is that any exceptions thrown by tasks run with execute(...) cause the running thread to terminate and possibly be restarted. With submit(...) it allows you to get that exception if needed and stops the threads from having to be respawned unnecessarily.

If you explain a bit more about what you are trying to accomplish, we should be able to help.

Gray
  • 115,027
  • 24
  • 293
  • 354