2

I'm experimenting with threads parking and decided to build some sort of service. Here is how it looks like:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

The issue I faced with was that if I then test this service as follows:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

I sometimes got the following behavior:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

The thread is hanging out on parking:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new$0(TestService.java:27)
    at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

I don't see any race in park-unpark in the service. Moreover if the unpark is called before park, the park is guaranteed no to block (that's what javadocs say).

Maybe I misuse LockSupport::park. Can you suggest any fix?

St.Antario
  • 26,175
  • 41
  • 130
  • 318
  • I don't seem to get that exception; I've been running your code for 3 minutes now... – daniu May 16 '18 at 12:36
  • @daniu Did you use log4j/logback? – St.Antario May 16 '18 at 12:37
  • no, actually, `java.util.logging` (don't have log4j here). Maybe it really is the problem (although I don't see how). – daniu May 16 '18 at 12:39
  • @daniu Actually I tried first to just `System.out.println` all messages and the code worked fine. After switching to log4j/logback I started fcing this issue. – St.Antario May 16 '18 at 12:40
  • 1
    @daniu Seems this is because logback. It uses `ReentrantLock` for synchronization which in turn uses parks thread for waiting (via `AbstractQueuedSynchronizer`). So `LockSupport::park` seems not safe to use with sort of "alien" code which can park/unpark threads itself. – St.Antario May 16 '18 at 12:46
  • That is weird, and if it's due to the use `ReentrantLock`, it's an issue with that rather than logback (which only introduces the problem in your case) - so it's an actual JDK issue. After all, you might be using `ReentrantLock` in other (non-"alien") code parts and cause the same behavior. – daniu May 16 '18 at 12:54
  • @daniu The thing what's confusing me is that `unpark` does not accept a `blocker` object. Just a thread. – St.Antario May 16 '18 at 12:59
  • Well you wouldn't be able to `unpark()` with a blocker object if you're using the same one (as you do). But in general I agree that `unpark()` accepts `Thread`, especially since the preferred Java concurrency handling uses `ExecutorService`s and does not involve the direct use of `Thread` itself at all anymore. That's probably also the way to solve your problem. – daniu May 16 '18 at 13:08
  • @daniu I can imagine the scenarion when we unpark a worker in order to gain permit. Then this worker is blocking on `ReentrantLock` so the permit is lost. And next time we park the thread it will be parked (permit was lost in `ReentrantLock`). Is it correct? – St.Antario May 16 '18 at 13:25
  • 1
    @daniu Reproduced the behavior replacing logback calls with `lock.lock(); long l = 0; while(l++ < 100000000L){} lock.unlock();` – St.Antario May 16 '18 at 13:35
  • 2
    The `LockSupport` javadoc does state "Basic thread blocking primitives for creating locks and other synchronization classes". I guess the lesson is "don't use it along with locks" (and rather stay on the same synchronization level). – daniu May 16 '18 at 13:43
  • @daniu Sounds reasonable, thanks. – St.Antario May 16 '18 at 13:59
  • @daniu it's a [bit simpler](https://stackoverflow.com/a/61149733/1059372), I guess. – Eugene Apr 10 '20 at 22:45

1 Answers1

3

This has nothing to do with logger, though it's usage brings the problem to the surface. You have a race condition, as simple as that. Before explaining that race condition you need to understand a few things from LockSupport::unpark documentation first:

Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.

The first point is explain here. The short version is : if you have a thread that has already been started, but has not yet called park, and within this period of time (between the start of the thread and park), some other thread calls unpark on the first one : that thread will not park, at all. The permit will be available immediately. May be this little drawing will make it more clear:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

Notice how ThreadB calls unpark(ThreadA) between the period where ThreadA has called start and park. As such, when ThreadA reaches park: it is guaranteed not to block, exactly like the documentation says.

The second point from the same documentation is:

This operation is not guaranteed to have any effect at all if the given thread has not been started.

Let's see that via a drawing:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

After ThreadA calls park, it will hang forever, since ThreadB never calls unpark on it again. Notice that the call to unpark was made before ThreadA has started (unlike the previous example).

And this is exactly what happens in your case:

LockSupport.unpark(w); (from unparkWorkers) is called before t.start(); from public void start(){...}. In simpler words - your code calls unpark on both workers before they even start, as such when they ultimately reach park - they are stuck, no one is able to unpark them. The fact that you see this with a logger and not with System::out has most probably to do with the face that when you println - there is a synchronized method under the hood.


As a matter of fact, LockSupport offers exactly the semantics needed to prove this. For this we need (for simplicity : SOProblem service = new SOProblem(1);)

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

And now we need to insert this in proper methods. First flag that fact that we have called unpark:

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

Then reset the flag after a cycle has ended:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

Then change the constructor to flag that the thread has started:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

Then, when your thread freezes you need to inspect the flag:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

where debug method is:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

When the issue re-produces, you have called unpark before you called park, that happens when x = 3 as the output.

Eugene
  • 117,005
  • 15
  • 201
  • 306