0

I'm working in an application consisting of a REST API that exposes several endpoints, one of them is kind of an initialization operation that loads the system data and can be called several times during the application lifecycle, removing the existing data and loading the new one. The rest of the endpoints perform certain operations over data and introduce some additional data into the system that would be erased when the initialization endpoint is called. Taking into account that the application should work consistently under a concurrent scenario, I've decided to implement a Synchronizer class based on a ReentrantReadWriteLock, where only the initialization operation would lock on the write lock, and the rest of operations would lock on the read lock. However, I'm facing a weird behavior when it comes to testing my implementation and I guess that there is something I don't really understand well, not sure if about my implementation or the test itself.

public class LockBasedSynchronizer implements Synchronizer {

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    @Override
    public void startNonBlocking() {
        lock.readLock().lock();
    }

    @Override
    public void endNonBlocking() {
        lock.readLock().unlock();
    }

    @Override
    public void startBlocking() {
        lock.writeLock().lock();
    }

    @Override
    public void endBlocking() {
        lock.writeLock().unlock();
    }
}


class LockBasedSynchronizerTest {

    private LockBasedSynchronizer synchronizer;

    @BeforeEach
    void setUp() {
        synchronizer = new LockBasedSynchronizer();
    }

    @Test
    void itDoesntBlockNonBlockingOperations() {
        List<String> outputScrapper = new ArrayList<>();
        List<Worker> threads = asList(
                new NonBlockingWorker(synchronizer, outputScrapper, 300, "first"),
                new NonBlockingWorker(synchronizer, outputScrapper, 100, "second"),
                new NonBlockingWorker(synchronizer, outputScrapper, 10, "third")
        );

        threads.parallelStream().forEach(Worker::run); // this way it works
        // threads.forEach(Worker::run); // this way it fails

        assertThat(outputScrapper).containsExactly("third", "second", "first");
    }    

    @AllArgsConstructor
    abstract class Worker extends Thread {
        protected Synchronizer synchronizer;
        private List<String> outputScraper;
        private int delayInMilliseconds;
        private String id;

        protected abstract void lock();

        protected abstract void unlock();

        @Override
        public void run() {
            try {
                System.out.println(format("Starting [%s]", id));
                sleep(delayInMilliseconds);
                lock();
                outputScraper.add(id);
                unlock();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    class NonBlockingWorker extends Worker {

        public NonBlockingWorker(Synchronizer synchronizer, List<String> outputScraper, int delayInMilliseconds, String id) {
            super(synchronizer, outputScraper, delayInMilliseconds, id);
        }

        @Override
        protected void lock() {
            synchronizer.startNonBlocking();
        }

        @Override
        protected void unlock() {
            synchronizer.endNonBlocking();
        }
    }

As you can see, when I spawn the worker threads in parallel through a parallel stream, it works as expected, but if I run them sequentially, they didn't work as expected and are executed in the very same order they were run. It seems like they were executed using just one single thread. Could someone explain why?

Also, another doubt that comes to my mind about this implementation is what would happen if the blocking operation is waiting for the write lock and non-blocking operations don't stop to arrive. Would it be waiting indefinitely until non-blocking operations stop to arrive?

Apart from that, if you can think of any better solution for the exposed problem, suggestions are more than welcome.

beni0888
  • 1,050
  • 1
  • 12
  • 40
  • 2
    It's the same old problem (https://stackoverflow.com/questions/8579657/whats-the-difference-between-thread-start-and-runnable-run): your `Worker` class extends `Thread`, but you never `start()` those threads. Instead your code calls the `run()` method himself, which means the `run()` method is executed synchronuously. – Thomas Kläger Jul 07 '20 at 05:19

2 Answers2

1

Firstly, the main issue with threads.forEach(Worker::run) is that it doesn't actually run anything concurrently, you're just iterating through your list of Workers and calling their run() method, one after the other.

Now a few more points.

In general, don't extend Thread. Rather create some Runnable or Callable or some other interface to be run in a thread, usually via an Executor. You're not trying to re-define Thread you're just trying to run something concurrently.

Also, with the code you have now, you're not making any use of extending Thread anyway since you're just applying the run() method for the Workers in your Stream which could really just be any method of whatever class is in your stream or list.

What I think is best would be to have Worker implement Runnable, instead of extending Thread, and feed those Workers into an Executor. Or simply applying run() to the elements of a parallelStream() is ok too I guess, just know what it is you're doing.

xtratic
  • 4,600
  • 2
  • 14
  • 32
1

Calling Thread::run is blocking, and it doesn't create a new thread. You need to call the method Thread::start and wait 1 second for the workers to complete:

threads.forEach(Worker::start);
Thread.sleep(1000);
Marc
  • 2,738
  • 1
  • 17
  • 21