2

I came across a Java concurrency problem which I thought I had solved but now I feel that My solution has a problem.

The problem:

Implement the missing methods in a thread safe and performant way. The system should subscribe only on the first request per T key and should unsubscribe once no more listeners remain for a given key.

interface Listener {
    void onData();
}

abstract class Publisher<T> {       
    public void subscribe(T key, Listener l) {
        // TODO complete
    }

    public void unsubscribe(T key, Listener l) {
        // TODO complete
    }

    public void publish(T key) {
        // TODO complete
    }

    public abstract void reallyLongSubscribeRequest(T key);
    public abstract void reallyLongUnsubscribeRequest(T key);
}

My solution was to store the the key's and listeners in a ConcurrentHashMap and use a thread pool executor to run a call to the really long subscribe and unsubcribe methods:

abstract class Publisher<T> {
    private final int POOL_SIZE = 5;

    private final ConcurrentHashMap<T, List<Listener>> listeners = new ConcurrentHashMap<>();
    private final ScheduledExecutorService stpe = Executors.newScheduledThreadPool(POOL_SIZE);

    public void subscribe(T key, Listener l) {
        if (listeners.containsKey(key)) {
            listeners.get(key).add(l);
        } else {
            final T keyToAdd = key;
            final List<Listener> list = new LinkedList<Listener>();
            list.add(l);

            Runnable r = new Runnable() {
                public void run() {
                    reallyLongSubscribeRequest(keyToAdd);
                    listeners.putIfAbsent(keyToAdd, list);
                }
            };

            stpe.execute(r);
        }
    }

    public void unsubscribe(T key, Listener l) {
        if (listeners.containsKey(key)) {
            List<Listener> list = listeners.get(key);
            list.remove(l);

            if (list.size() == 0) {
                final T keyToRemove = key;
                Runnable r = new Runnable() {
                    public void run() {
                        reallyLongUnsubscribeRequest(keyToRemove);
                    }
                };
                stpe.execute(r);
            }
        }
    }

    public void publish(T key) {
        if (listeners.containsKey(key)) {
            final List<Listener> list = listeners.get(key);
            for (Listener l : list) {
                l.onData();
            }
        }
    }

    public abstract void reallyLongSubscribeRequest(T key);
    public abstract void reallyLongUnsubscribeRequest(T key);
}

I'm now concerned that this is no longer thread safe because

  1. In subscribe the active thread could be swapped out/have its timeslice ended between entering the false branch and executing the Runnable. If the next thread makes the same call (same key) then we will have two threads wanting to subscribe and write to the map. putIfAbsent keeps the map consistent but the really long method will get called twice (this is bad if it changes the class state).

  2. Similar to #1, in unssubscribe what if the thread is swapped out between entering the true branch of the nested if and executing the Runnable?

So my questions are

  1. Are my above concerns valid or am I over complicating the matter (or have I misunderstood how time slicing works)?
  2. If yes, can this be easily fixed or there a much better/easier/simpler solution?
clicky
  • 865
  • 2
  • 14
  • 31

2 Answers2

2

You've got two problems:

  • Your subscribe, unsubscribe and publish methods should be synchronized to make them thread safe.

  • You should have have just a single thread to do the reallyLong...() calls that waits on a Queue. You post to the Queue a message telling is to do one or the other, and it does. The queue will ensure that they happen one after the other.

You also have a bug in your code. You only do the reallyLongSubscribeRequest(...) when the key doesn't exist in the map, but you are not removing the key from the map when you remove the last listener.

Jamie Cockburn
  • 7,379
  • 1
  • 24
  • 37
  • Wouldn't this block the thread until `reallyLong...()` returns, reducing performance? – clicky Jul 08 '14 at 14:01
  • I mean a single additional thread, hang on, I'll do a wee example. – Jamie Cockburn Jul 08 '14 at 14:02
  • 1
    The `reallyLong...()`methods would run serially on one thread, separate from the main thread doing the (un)subscribe operations. The key here is that it is **very** important that the `reallyLong...()` methods happen in order, or at least in order per key. Operations for different keys could be made to run in parallel, which would mean an operation for key A would not hold up another operation for key B. – Jamie Cockburn Jul 08 '14 at 15:37
  • @clicky I've also added a note to the answer describing a bug you have. – Jamie Cockburn Jul 08 '14 at 16:20
  • Thank you, I will use a BlockingQueue. – clicky Jul 09 '14 at 11:39
  • 1
    @clicky if you are going for the single thread option (so all `reallyLong...` operations run serially) you can continue to use the `ExecutorService`, just use the `Executors.newSingleThreadExecutor()`. You only need a custom solution if you're going to attempt serialising the operations per-key. You can see [this question](http://stackoverflow.com/questions/7192223/ensuring-task-execution-order-in-threadpool) for how to do that. – Jamie Cockburn Jul 09 '14 at 13:18
1

You have a couple of problems. Your lists are not thread-safe, and you are correct, you could run a request multiple times. The ConcurrentHashMap is a nice way to get parallel but thread-safe map access. however, you need to implement some sort of "per-key" synchronization to ensure that the (un)subscribe operations happen correctly (not to mention the list updates).

jtahlborn
  • 52,909
  • 5
  • 76
  • 118