I'm trying to gather specific data on how long a task waits between being submitted and actually being executed. The idea is to be able to closely monitor the existing threadpool and tasks that are submitted for execution.
Let's assume I have an ExecutorService
with a fixedThreadPool
.
I'm also using a composition of completableFutures to perform a set of tasks asynchronously. I'd like to be able to track within my logs the exact time a certain task had to wait in the queue before being taken for execution. The way I see it I need two things:
- A way to label
CompletableFuture
(or theSupplier
functions passed toCompletableFuture.supplyAsync()
)
This I could potentially do by providing a wrapper method for the Supplier as mentioned here https://stackoverflow.com/a/57888886 and overwrite theCompletableFuture.supplyAsync()
method so it will internally log which named supplier was provided - A way to monitor the time between the submission and execution of a specific Runnable to the threadpool executor.
This I can achieve by extending theThreadPoolExecutor
and providing some custom logging in thebeforeExecute()
andexecute()
method
What I'm kind of 'stuck' on now is linking both of them together. The beforeExecute()
method override gives me the thread and the runnable - but the thread in itself doesn't tell me much yet, and the runnable isn't named in any way so I can't really know which exact task is taken for execution. Of course I can add additional logs in the task implementations themselves and then assume that they will be right next to the log from beforeExecute()
. The problem still remains for execute()
itself, since that one is called internally after using the CompletableFuture
composition.
So how can I properly link the information from the executor service, with some labelling of the exact tasks provided as CompletableFutures as in the example below?
List<Foo> results = createResults();
results.forEach(r -> CompletableFuture.completedFuture(r)
.thenCompose(result -> addSomething(result, something)
.thenCombine(addSomethingElse(result, somethingElse), (r1, r2) -> result)
.thenCompose(r -> doSomething(result).thenCompose(this::setSomething))
.thenApply(v -> result))));
...
// At some point join() is called to actually wait for execution and completion
listOfFutures.join()
And each of the functions called within return a CompletableFuture<Foo>
created by:
private CompletableFuture<Foo> setSomething(Foo foo) {
return CompletableFuture.supplyAsync(() -> {
foo.description = "Setting something";
return foo;
}, myExecutorService);
}
So even by wrapping the Supplier<T>
to have it labeled, how am I able to link this with the tracking within the execute()
and beforeExecute()
method of the ThreadPoolExecutor
when that one operates on Runnables instead of Suppliers?