0

I have a task that i want to do in an async manner since its resource intensive, therefore i want a single thread executor pool that will start the next task once the first one is completed

through my main application thread i might call this task numerous times

i wrote a simple test and I'm seeing that instead of a single thread being called every 5 seconds (the artificial delay i placed to simulate), its being called many times

here is my class

public class NoTypeHandlerService {


    private Executor pool;

    private static NoTypeHandlerService instance = null;

    @Getter
    private int SECONDS_TO_WAIT_BETWEEN_JOBS = 5;

    private NoTypeHandlerService() {
        pool = Executors.newSingleThreadExecutor();

    }

    public static NoTypeHandlerService getInstance() {
        if (instance == null) {
            instance = new NoTypeHandlerService();
        }
        return instance;
    }

    public void assignType(ComplianceCandidate candidate) {

        CompletableFuture.supplyAsync(()-> candidate, pool).thenApplyAsync(AssignType::doWork);
    }

}

AssignType class is the task

public class AssignType {

    public static ComplianceCandidate doWork(ComplianceCandidate candidate) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            log.error("Could not execute type assignation job for: {}", candidate);
        }
        log.debug("Proceeding to assign type for: {}", candidate);

        return candidate;

    }
    
}

here is my simple test class

   @Test
    @DisplayName("Creates fake candidates and make type service")
    public void testTypeAssignationTest() throws InterruptedException {
        ComplianceDevice device = new ComplianceDevice("0X123456", "99.6.6.1", 1, 1);
        ComplianceDevice device1 = new ComplianceDevice("0X1234567", "99.6.6.1", 1, 1);
        ComplianceDevice device2 = new ComplianceDevice("0X12345678", "99.6.6.1", 1, 1);
        ComplianceCandidate candidate = new ComplianceCandidate(device);
        ComplianceCandidate candidate1 = new ComplianceCandidate(device1);
        ComplianceCandidate candidate2 = new ComplianceCandidate(device2);
        NoTypeHandlerService.getInstance().assignType(candidate);
        NoTypeHandlerService.getInstance().assignType(candidate1);
        NoTypeHandlerService.getInstance().assignType(candidate2);
        log.debug("Successfully entered all jobs to execution");

        TimeUnit.SECONDS.sleep(15);

    }

when i run this i see the following logs

2020-07-26 21:36:50 [main] - Successfully entered all jobs to execution
2020-07-26 21:36:55 [ForkJoinPool.commonPool-worker-7] - Proceeding to assign type for: ComplianceCandidate [device=[ipAddress=99.6.6.1, macAddress=0X12345678, type=null]]
2020-07-26 21:36:55 [ForkJoinPool.commonPool-worker-5] - Proceeding to assign type for: ComplianceCandidate [device=[ipAddress=99.6.6.1, macAddress=0X1234567, type=null]]
2020-07-26 21:36:55 [ForkJoinPool.commonPool-worker-3] - Proceeding to assign type for: ComplianceCandidate [device=[ipAddress=99.6.6.1, macAddress=0X123456, type=null]]

this is not the behavior i was hoping for, while the main thread is indeed not blocked, instead of seeing only one of these guys ForkJoinPool.commonPool-worker-7 printing every 5 seconds, I'm seeing 3 of them all printing at the same time

naoru
  • 2,149
  • 5
  • 34
  • 58
  • 2
    You'll need to make `NoTypeHandlerService.getInstance()` synchronised if you want to guarantee that you only create one executor if you start using it in a multithreaded environment. That's not your current issue though. – tgdavies Jul 27 '20 at 03:04
  • 1
    Have a look at the Javadoc for `CompletableFuture.supplyAsync()` – tgdavies Jul 27 '20 at 03:08
  • ompletableFuture.supplyAsync(()-> candidate, pool) pool is the single thread pool ive created – naoru Jul 27 '20 at 13:40
  • 1
    The `CompletableFuture` does not remember nor reuse the pool. `supplyAsync(()-> candidate, pool)` is correct, but the subsequent `thenApplyAsync(AssignType::doWork)` will use the default pool. You again have to use the overload accepting an `Executor` (`thenApplyAsync(Consumer,Executor)`). Each time you use one of the `…Async` methods. Only with Java 9+, you can override the [`defaultExecutor()`](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html#defaultExecutor()) method. – Holger Jul 28 '20 at 17:18
  • @naoru Sorry, I missed that for some reason. Holger's comment should solve your problem. – Slaw Jul 28 '20 at 18:22
  • @Holger When overriding `defaultExecutor()` one also has to override [`newCompletableFuture()`](https://docs.oracle.com/en/java/javase/14/docs/api/java.base/java/util/concurrent/CompletableFuture.html#newIncompleteFuture()), correct? At least if one wants later stages to continue to use the custom default executor? – Slaw Jul 28 '20 at 18:24
  • 1
    @Slaw yes, exactly. The method’s name is `newIncompleteFuture()` though. Like in [this answer](https://stackoverflow.com/a/56356109/2711488). – Holger Jul 28 '20 at 18:30
  • can you wirte some implementation as how to overide it asn answer? – naoru Jul 28 '20 at 21:03

0 Answers0