I came across a Java concurrency problem which I thought I had solved but now I feel that My solution has a problem.
The problem:
Implement the missing methods in a thread safe and performant way. The system should subscribe only on the first request per T key
and should unsubscribe once no more listeners remain for a given key.
interface Listener {
void onData();
}
abstract class Publisher<T> {
public void subscribe(T key, Listener l) {
// TODO complete
}
public void unsubscribe(T key, Listener l) {
// TODO complete
}
public void publish(T key) {
// TODO complete
}
public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}
My solution was to store the the key's and listeners in a ConcurrentHashMap
and use a thread pool executor to run a call to the really long subscribe and unsubcribe methods:
abstract class Publisher<T> {
private final int POOL_SIZE = 5;
private final ConcurrentHashMap<T, List<Listener>> listeners = new ConcurrentHashMap<>();
private final ScheduledExecutorService stpe = Executors.newScheduledThreadPool(POOL_SIZE);
public void subscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
listeners.get(key).add(l);
} else {
final T keyToAdd = key;
final List<Listener> list = new LinkedList<Listener>();
list.add(l);
Runnable r = new Runnable() {
public void run() {
reallyLongSubscribeRequest(keyToAdd);
listeners.putIfAbsent(keyToAdd, list);
}
};
stpe.execute(r);
}
}
public void unsubscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
List<Listener> list = listeners.get(key);
list.remove(l);
if (list.size() == 0) {
final T keyToRemove = key;
Runnable r = new Runnable() {
public void run() {
reallyLongUnsubscribeRequest(keyToRemove);
}
};
stpe.execute(r);
}
}
}
public void publish(T key) {
if (listeners.containsKey(key)) {
final List<Listener> list = listeners.get(key);
for (Listener l : list) {
l.onData();
}
}
}
public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}
I'm now concerned that this is no longer thread safe because
In subscribe the active thread could be swapped out/have its timeslice ended between entering the false branch and executing the Runnable. If the next thread makes the same call (same key) then we will have two threads wanting to subscribe and write to the map.
putIfAbsent
keeps the map consistent but the really long method will get called twice (this is bad if it changes the class state).Similar to #1, in unssubscribe what if the thread is swapped out between entering the true branch of the nested if and executing the Runnable?
So my questions are
- Are my above concerns valid or am I over complicating the matter (or have I misunderstood how time slicing works)?
- If yes, can this be easily fixed or there a much better/easier/simpler solution?