I have some Java app using Spring Batch. I've got a table used as a queue which contains information on jobs that were requested by clients (as a client requests for a task to be executed, a row is added to this queue).
In one of my classes a while loop is run until someone deactivates some flag :
protected void runJobLaunchingLoop() {
while (!isTerminated()) {
try {
if (isActivated()) {
QueueEntryDTO queueEntry = dequeueJobEntry();
launchJob(queueEntry);
}
}
catch (EmptyQueueException ignored) {}
catch (Exception exception) {
logger.error("There was a problem while de-queuing a job ('" + exception.getMessage() + "').");
}
finally {
pauseProcessor();
}
}
}
The pauseProcessor()
method calls Thread.sleep(). When I run this app in a Docker container it looks like the number of threads run by the application keep on increasing. The threads have the name "Timer-X" with X some integer that auto-increments.
I looked at the stack trace of one of these :
"Timer-14" - Thread t@128
java.lang.Thread.State: WAITING
at java.base@11.0.6/java.lang.Object.wait(Native Method)
- waiting on <25e60c31> (a java.util.TaskQueue)
at java.base@11.0.6/java.lang.Object.wait(Unknown Source)
at java.base@11.0.6/java.util.TimerThread.mainLoop(Unknown Source)
- locked <25e60c31> (a java.util.TaskQueue)
at java.base@11.0.6/java.util.TimerThread.run(Unknown Source)
Locked ownable synchronizers:
- None
Any idea what could be the cause of this? I'm not sure but if I don't run the app in a container but locally from IntelliJ, it seems like the problem does not occur. I'm not sure because sometimes it takes a while before thread count starts increasing.
EDIT : Some relevant code ...
protected QueueEntryDTO dequeueJobEntry() {
Collection<QueueEntryDTO> collection = getQueueService().dequeueEntry();
if (collection.isEmpty())
throw new EmptyQueueException();
return collection.iterator().next();
}
@Transactional
public Collection<QueueEntryDTO> dequeueEntry() {
Optional<QueueEntry> optionalEntry = this.queueEntryDAO.findTopByStatusCode(QueueStatusEnum.WAITING.getStatusCode());
if (optionalEntry.isPresent()) {
QueueEntry entry = (QueueEntry)optionalEntry.get();
QueueEntry updatedEntry = this.saveEntryStatus(entry, QueueStatusEnum.PROCESSING, (String)null);
return Collections.singleton(this.queueEntryDTOMapper.toDTO(updatedEntry));
} else {
return new ArrayList();
}
}
private void pauseProcessor() {
try {
Long sleepDuration = generalProperties.getQueueProcessingSleepDuration();
sleepDuration = Objects.requireNonNullElseGet(
sleepDuration,
() -> Double.valueOf(Math.pow(2.0, getRetries()) * 1000.0).longValue());
Thread.sleep(sleepDuration);
if (getRetries() < 4)
setRetries(getRetries() + 1);
}
catch (Exception ignored) {
logger.warn("Failed to pause job queue processor.");
}
}