I have a scenario where I need to send 1M message to a server using a blocking API. The API does not accept batch request so I have to send 1M message one by one.
Instead of using one thread, I am thinking to use multiple threads to send them.
The caller has to wait for all 1M messages to be sent before proceeding.
My implementation is as follows:
public class MySender {
private final MyPublisher myPublisher;
private final ExecutorService threadPool;
private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();
public MySender (final MyPublisher myPublisher,
ExecutorService threadPool) {
this.myPublisher= myPublisher;
this.threadPool = threadPool;
}
public void send(final MyData event) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
futureList.add(future);
}
public void notifySendComplete(final String id) {
if(!jobMap.containsKey(id)) {
return;
}
jobMap.get(id).forEach(CompletableFuture::join);
jobMap.remove(id);
}
private void doSubmit(final MyData event) {
try {
....
myPublisher.send(event);
....
} catch(Exception e) {
// log error
}
}
}
The client class can simply use this class this way:
myInputList.forEach(input -> {
MyData event = createData(input);
mySender.send(event);
})
mySender.notifySendComplete();
I think this implementation will work, but the problem is obvious. It needs to hold 1M CompletableFuture in the map, which are not eligible for garbage collection.
Is it a big problem? If so, are there any better approaches?
Restriction:
- The Thread pool cannot be shut down
- I can implement it using CountDownLatch but it is not allowed to use in my project.