I'm working in an application consisting of a REST API that exposes several endpoints, one of them is kind of an initialization operation that loads the system data and can be called several times during the application lifecycle, removing the existing data and loading the new one. The rest of the endpoints perform certain operations over data and introduce some additional data into the system that would be erased when the initialization endpoint is called. Taking into account that the application should work consistently under a concurrent scenario, I've decided to implement a Synchronizer
class based on a ReentrantReadWriteLock
, where only the initialization operation would lock on the write lock, and the rest of operations would lock on the read lock. However, I'm facing a weird behavior when it comes to testing my implementation and I guess that there is something I don't really understand well, not sure if about my implementation or the test itself.
public class LockBasedSynchronizer implements Synchronizer {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void startNonBlocking() {
lock.readLock().lock();
}
@Override
public void endNonBlocking() {
lock.readLock().unlock();
}
@Override
public void startBlocking() {
lock.writeLock().lock();
}
@Override
public void endBlocking() {
lock.writeLock().unlock();
}
}
class LockBasedSynchronizerTest {
private LockBasedSynchronizer synchronizer;
@BeforeEach
void setUp() {
synchronizer = new LockBasedSynchronizer();
}
@Test
void itDoesntBlockNonBlockingOperations() {
List<String> outputScrapper = new ArrayList<>();
List<Worker> threads = asList(
new NonBlockingWorker(synchronizer, outputScrapper, 300, "first"),
new NonBlockingWorker(synchronizer, outputScrapper, 100, "second"),
new NonBlockingWorker(synchronizer, outputScrapper, 10, "third")
);
threads.parallelStream().forEach(Worker::run); // this way it works
// threads.forEach(Worker::run); // this way it fails
assertThat(outputScrapper).containsExactly("third", "second", "first");
}
@AllArgsConstructor
abstract class Worker extends Thread {
protected Synchronizer synchronizer;
private List<String> outputScraper;
private int delayInMilliseconds;
private String id;
protected abstract void lock();
protected abstract void unlock();
@Override
public void run() {
try {
System.out.println(format("Starting [%s]", id));
sleep(delayInMilliseconds);
lock();
outputScraper.add(id);
unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NonBlockingWorker extends Worker {
public NonBlockingWorker(Synchronizer synchronizer, List<String> outputScraper, int delayInMilliseconds, String id) {
super(synchronizer, outputScraper, delayInMilliseconds, id);
}
@Override
protected void lock() {
synchronizer.startNonBlocking();
}
@Override
protected void unlock() {
synchronizer.endNonBlocking();
}
}
As you can see, when I spawn the worker threads in parallel through a parallel stream, it works as expected, but if I run them sequentially, they didn't work as expected and are executed in the very same order they were run. It seems like they were executed using just one single thread. Could someone explain why?
Also, another doubt that comes to my mind about this implementation is what would happen if the blocking operation is waiting for the write lock and non-blocking operations don't stop to arrive. Would it be waiting indefinitely until non-blocking operations stop to arrive?
Apart from that, if you can think of any better solution for the exposed problem, suggestions are more than welcome.