8

I don't understand the value of 'a' is 0,Why is 'a' not 10,What is the running process of that code,Is it necessary to analyze from Java Memory Model? Here is my test code

package com.study.concurrent.demo;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

//@SpringBootTest
@Slf4j
class DemoApplicationTests {

    int a = 0;
    int b = 0;
    @Test
    void contextLoads() {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
//        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
//                try {
//                    semaphore.acquire();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                add();
                bdd();
//              log.info("a: {},concurrent_id: {}",a,Thread.currentThread().getName());
//                semaphore.release();
            });
        }
        executorService.shutdown();
        log.info("The final value of a:{}",a);
        log.info("The final value of b:{}",b);
    }


    public void add(){
        a++;
    }
    public void bdd(){
        b++;
    }

}

Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154
developYan
  • 83
  • 3
  • As an aside, you should (almost) always release the `semaphore` in a `finally` block, to make sure you don't accidentally forget to release it. – Andy Turner Feb 08 '21 at 16:49
  • 4
    Try using `submit()` instead of `execute()`. Submit returns a `Future` that allows you to wait for the thread to finish (or can even just get a value returned by the thread). – markspace Feb 08 '21 at 16:51
  • @markspace another reason to use `submit` to submit a `Callable` (as alluded to in the last sentence) is that `Callable`s allow checked exceptions to be thrown, so you don't have to worry about handling the `InterruptedException` explicitly. – Andy Turner Feb 08 '21 at 17:05
  • @developYan, there are visibility, atomicity and happens-before issues in the code. I have added an answer to handle all these 3 conditions. – Thiyanesh Feb 08 '21 at 17:59
  • Thank you for your answers, even if they are not adopted, . Your answers have relieved my confusion – developYan Feb 09 '21 at 03:24

4 Answers4

10

Two reasons:

  1. You're not waiting for the threads to finish, you're just shutting down the thread pool (that is: causing the thread pool to reject new tasks but continue to process existing tasks).

  2. You're not establishing a happens-before relationship between the writes in the thread pool and the read in the main thread.

    You could do this by (amongst other methods):

    1. Acquiring the semaphore before reading a;
    2. By using submit instead of execute to get a Future<?> for each of the tasks submitted, and invoking the Future.get() method on all of the returned futures. It is documented in the Javadoc of ExecutorService that this establishes a happens-before.

The first point is the "main" reason why a is coming out as zero: if I run it locally, and wait for the the thread pool to terminate, a comes out to 10.

However, just because it comes out as 10 doesn't mean the code works correctly without paying attention to the second point: you need to apply the Java Memory Model to have guarantees of correct functioning.

Andy Turner
  • 137,514
  • 11
  • 162
  • 243
  • there is no happens-before relationship or atomicity in the read and writes within thread pool threads. So, we may need to fix the update operation of `a` and `b` to have a predictable behavior. – Thiyanesh Feb 08 '21 at 18:05
  • Thank you for your answer,I've forgotten that multithreading and the main thread execute asynchronously,Multithreading is also random and the value of 'b' of 10 is not necessarily true – developYan Feb 09 '21 at 03:17
5

Issues

  1. Visibility - Multiple threads are accessing the same variable and the code does not have any visibility guarantees

  2. volatile can help with visibility guarantee

  3. Atomicity - Multiple threads are updating through a++ or b++ operations. These are not atomic operations. This is primarily set of operations 1. fetch a. 2. increment a. 3. update a. A context switch can happen in any of these states and result in incorrect value.

  4. So volatile visibility alone is not enough for correctness

  5. Use AtomicInteger to guarantee atomicity of the increment operation

  6. AtomicXXX can guarantee atomicity of a single operation

  7. If there was a need to increment both a and b together, then some form of synchronization is needed

  8. Communication - This is not communication between the main thread and executor task threads to communicate completion events

  9. executorService.shutdown() will not ensure this communication

  10. Latch can be used for this communication

  11. Or as mentioned by Andy, Future can be used

An example code with AtomicInteger and Latch

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class DemoApplicationTests {
    final AtomicInteger a = new AtomicInteger(0);
    final AtomicInteger b = new AtomicInteger(0);

    void contextLoads() throws Exception {
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                add();
                bdd();
                latch.countDown();
            });
        }
        latch.await();
        executorService.shutdown();
        System.out.println("The final value of a:" + a);
        System.out.println("The final value of b:" + b);
    }

    public void add() {
        a.incrementAndGet();
    }
    public void bdd() {
        b.incrementAndGet();
    }

    public static void main(String[] args) throws Exception {
        new DemoApplicationTests().contextLoads();
    }
}

An incorrect solution with threadpool size > 1 and CompletableFuture due to race conditions in a++, b++.

The following can(my knowledge is limited and can't confirm either way) be a perfectly legal code for a thread pool size of 1 (copied from Eugene's answer)

But when the same code was executed with thread pool size > 1, it will result in race conditions. (again the intention is to discuss about multiple threads and data visibility issues as is and not to project Eugene's answer as incorrect. Eugene's answer is in the context of single thread in threadpool and might be perfectly valid for single threaded threadpool scenario)

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DemoApplicationTests {
    int a = 0;
    int b = 0;

    void contextLoads() throws Exception {
        final int count = 10000;
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        List<Runnable> list = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            Runnable r = () -> {
                add();
                bdd();
            };
            list.add(r);
        }

        CompletableFuture<?>[] futures = list.stream()
            .map(task -> CompletableFuture.runAsync(task, executorService))
            .toArray(CompletableFuture[]::new);

        CompletableFuture.allOf(futures).join();

        executorService.shutdown();
        System.out.println("The final value of a: " + a);
        System.out.println("The final value of b:" + b);
    }

    public void add() {
        a++;
    }
    public void bdd() {
        b++;
    }

    public static void main(String[] args) throws Exception {
        new DemoApplicationTests().contextLoads();
    }
}

Thank you @Basil Bourque for fixing the grammatical errors

Thiyanesh
  • 2,360
  • 1
  • 4
  • 11
  • your example with `AtomicInteger` is correct, but why do you need a `CountDownLatch` there? – Eugene Feb 09 '21 at 05:13
  • @Eugene, as my code did not use Futures to communicate between the tasks and main thread, i used the CountDownLatch. The primary advantage(just a personal understanding and could be wrong) is: If only a portion of every task (say initial fast in-memory operation followed by slow IO) is required by other part of a program, then i thought `CountDownLatch` might be a useful construct to coordinate. Otherwise, the user has to break these code in further granular tasks (separate set of tasks for in-memory and IO to speed up other waiting part of execution. but this might complicate data sharing). – Thiyanesh Feb 09 '21 at 06:47
  • @Eugene, on different note from the above comment, even removing the `executorService.shutdown();` would still result in correct count printed for `a` and `b`. (as the main thread in the example relies only on the state of these 2 values). Atomicity and visibility is controlled by `AtomicInteger` and wait/notify is handled by `CountDownLatch`. – Thiyanesh Feb 09 '21 at 07:03
  • but `AtomicInteger` already offers visibility and atomicity. If you want to see `10` as the result, always, `CountDownLatch` is not needed here. – Eugene Feb 09 '21 at 12:06
  • @Eugene, With the code as it is, `CountDownLatch` is used to communicate the completion of the executor tasks to main thread. – Thiyanesh Feb 10 '21 at 01:40
  • @Eugene, My previous comment also states this: `Atomicity and visibility is controlled by AtomicInteger and wait/notify is handled by CountDownLatch.` – Thiyanesh Feb 10 '21 at 01:41
  • you're right. I missed that there is no `awaitTermination`. But this made me scratch my head a few times. Can you get atomicity, _without_ visibility? Cause right now, even if you drop the `CountDownLatch`, you will still see `10`, because `AtomicInteger::incrementAndGet` is documented to use that `VarHandle::getAndAdd` with proper volatile semantics... if it makes sense what I am trying to say. – Eugene Feb 10 '21 at 22:13
4

Your pool has 1 thread, and you submit 10 Runnables to it. They will all pile-up in a queue, until it's their turn to execute. Instead of waiting for all of them to finish, you call shutDown, effectively saying : "no more tasks will this pool take". When exactly is that going to happen and how many tasks have already been processed before the call to shutDown happened, is impossible to tell. As such, you get a very non-deterministic result. You could even see 10 as the output (sometimes), but that does not mean this is correct.

Instead, you can wait for the pool to finish executing all of its tasks:

executorService.awaitTermination(2, TimeUnit.SECONDS);
executorService.shutdown();

What slightly "sucks" is that awaitTermination does not explicitly mentions that if it returns true, it would establish a happens-before relationship. So to be pedantic with the JLS, you would need to work with that Semaphore for example, to establish the needed guarantees.


You have a race in your code, by updating a shared a and b from multiple threads (even if you currently use Executors.newFixedThreadPool(1)), without any synchronization. So that needs correction also. And a Semaphore semaphore = new Semaphore(3); is not going to help, since you still will allow 3 concurrent threads to work on those variables; you would need only a single permit. But then, this acts as Lock more then a Semaphore.

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • As per the question, there are visibility issues in `a` and `b` across threads to deterministically increment the values. – Thiyanesh Feb 08 '21 at 17:42
  • Even with this change, we may not guarantee `10` as output. We need to handle `a++` and `b++` atomically and with happens-before – Thiyanesh Feb 08 '21 at 17:44
  • @Horse and your reasonings are based on exactly what? – Eugene Feb 08 '21 at 17:45
  • as per OP `I don't understand the value of 'a' is 0,Why is 'a' not 10`, the expectation is to get `10` as the computed value. This operation ```public void add(){ a++; }``` does not guarantee visibility to same value across threads and also does not guarantee atomic increment of values. As per your current change, we can wait for completion of all tasks before accessing `a` or `b`, but how are we guaranteeing the atomic increments of properly sequenced access to the values? I might be incorrect, but this is my understanding. – Thiyanesh Feb 08 '21 at 17:50
  • Irrespective of the number of threads (in this case only `2` - 1. main thread and 2. task thread), is there a guarantee to the freshness when these threads are scheduled by thread pool(even sequentially due to size 1) across multiple cores based on a previous copy of a value? Maybe atomicity can be fine as there is only 1 thread accessing the value at any given time(due to thread pool size - `1`). – Thiyanesh Feb 08 '21 at 17:53
  • 1
    @Horse you' re right. I was so caught up in understanding if `CompletableFuture::join` would provide the needed _visibility_ guarantees, that I missed the obvious. thank you for the heads-up here. – Eugene Feb 09 '21 at 05:03
  • Thank you for taking your time to share the update. – Thiyanesh Feb 09 '21 at 06:50
3

The other Answers are correct, with important points. In addition, let me show how future technology being developed in Project Loom will simplify such code.

Project Loom

Project Loom will be bringing some changes to Java. Experimental builds of Loom technology, based on early-access Java 17, are available now. The Loom team is soliciting feedback.

AutoCloseable and try-with-resources

One change is that ExecutorService extends AutoCloseable. This means we can use try-with-resources syntax to conveniently and automatically close the service after the try-block completes.

Block until submitted tasks are done

Furthermore, the flow-of-control blocks on that try-block until all the submitted tasks are done/failed/canceled. No need to track progress of the individual tasks, unless you care to.

try (
        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    … submit tasks to the executor service, to be run on background threads.
}
// At this point, all submitted are done/failed/canceled. 
// At this point, the executor service is automatically being shut down.

AtomicInteger

As others said, your use of int primitives for a & b variables across threads may fail because of visibility issues in the Java Memory Model. One option is to mark them as volatile.

I prefer the alternative, using the Atomic… classes. Replace those int vars with AtomicInteger objects to wrap the incrementing count number.

Mark those member fields final so each instance is never replaced.

// Member fields
final AtomicInteger a, b;

// Constructor
public Incrementor ( )
{
    this.a = new AtomicInteger();
    this.b = new AtomicInteger();
}

To increment by one the value within the AtomicInteger, we call incrementAndGet. This call returns the new incremented number. So I altered the signature of your add methods to show that we can return the new value, if ever needed.

// Logic
public int addA ( )
{
    return this.a.incrementAndGet();
}

public int addB ( )
{
    return this.b.incrementAndGet();
}

Virtual threads

Another feature coming in Project Loom is virtual threads, also known as fibers. Many of these lightweight threads are mapped to run on platform/kernel threads. If your code often blocks, then using virtual threads will dramatically speed up performance of your app. Use the new feature by calling Executors.newVirtualThreadExecutor.

try (
        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{ … }

Example class

I have written a class named Incrementor to be similar to yours. Using such a class looks like this:

        Incrementor incrementor = new Incrementor();
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int i = 0 ; i < 10 ; i++ )
            {
                executorService.submit( ( ) -> {
                    int newValueA = incrementor.addA();
                    int newValueB = incrementor.addB();
                    System.out.println( "Thread " + Thread.currentThread().getId() + " incremented a & b to: " + newValueA + " & " + newValueB + " at " + Instant.now() );
                } );
            }
        }

Coordinating a & b

Caveat: As others commented, such code does not atomically increment both a & b together in synch. Apparently that is not a need of yours, so I ignore the issue. You can see that behavior in action, in the example run output shown at bottom below, where two threads interleaved during their access to a & b. Excerpted here:

Thread 24 incremented a & b to: 10 & 9 at 2021-02-09T02:21:30.270246Z
Thread 23 incremented a & b to: 9 & 10 at 2021-02-09T02:21:30.270246Z

Full class code

Pull together all this code.

Notice the simplicity of the code when (a) under Loom, and (b) using Atomic… constants. No need for semaphores, latches, CompletableFuture, nor calling ExecutorService#shutdown.

package work.basil.example;

import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Incrementor
{
    // Member fields
    final AtomicInteger a , b ;

    // Constructor
    public Incrementor ( )
    {
        this.a = new AtomicInteger();
        this.b = new AtomicInteger();
    }

    // Logic
    public int addA ( )
    {
        return this.a.incrementAndGet();
    }

    public int addB ( )
    {
        return this.b.incrementAndGet();
    }
}

And a main method to demonstrate using that class.

    public static void main ( String[] args )
    {
        // Exercise this class by instantiating, then incrementing ten times.
        System.out.println( "INFO - `main` starting the demo. " + Instant.now() );

        Incrementor incrementor = new Incrementor();
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int i = 0 ; i < 10 ; i++ )
            {
                executorService.submit( ( ) -> {
                    int newValueA = incrementor.addA();
                    int newValueB = incrementor.addB();
                    System.out.println( "Thread " + Thread.currentThread().getId() + " incremented a & b to: " + newValueA + " & " + newValueB + " at " + Instant.now() );
                } );
            }
        }

        System.out.println( "INFO - At this point all submitted tasks are done/failed/canceled, and executor service is shutting down. " + Instant.now() );
        System.out.println( "incrementor.a.get() = " + incrementor.a.get() );
        System.out.println( "incrementor.b.get() = " + incrementor.b.get() );
        System.out.println( "INFO - `main` ending. " + Instant.now() );
    }

When run.

INFO - `main` starting the demo. 2021-02-09T02:21:30.173816Z
Thread 18 incremented a & b to: 4 & 4 at 2021-02-09T02:21:30.245812Z
Thread 14 incremented a & b to: 1 & 1 at 2021-02-09T02:21:30.242306Z
Thread 20 incremented a & b to: 6 & 6 at 2021-02-09T02:21:30.246784Z
Thread 21 incremented a & b to: 8 & 8 at 2021-02-09T02:21:30.269666Z
Thread 22 incremented a & b to: 7 & 7 at 2021-02-09T02:21:30.269666Z
Thread 17 incremented a & b to: 3 & 3 at 2021-02-09T02:21:30.243580Z
Thread 24 incremented a & b to: 10 & 9 at 2021-02-09T02:21:30.270246Z
Thread 23 incremented a & b to: 9 & 10 at 2021-02-09T02:21:30.270246Z
Thread 16 incremented a & b to: 2 & 2 at 2021-02-09T02:21:30.242335Z
Thread 19 incremented a & b to: 5 & 5 at 2021-02-09T02:21:30.246646Z
INFO - At this point all submitted tasks are done/failed/canceled, and executor service is shutting down. 2021-02-09T02:21:30.279542Z
incrementor.a.get() = 10
incrementor.b.get() = 10
INFO - `main` ending. 2021-02-09T02:21:30.285862Z
Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154
  • Thank you very much for reminding about Project Loom. I have downloaded based on your answer. – Thiyanesh Feb 09 '21 at 03:18
  • Thank you for your generous answer. It's very detailed – developYan Feb 09 '21 at 03:42
  • 2
    @Horse I highly recommend the videos by Ron Pressler if you are trying Loom. See the more recent ones of 2020. Things have changed since the initial debut of Project Loom, such as no longer using `Fiber` as a class name. – Basil Bourque Feb 09 '21 at 04:04
  • @BasilBourque, Sure. Thanks again for sharing the information. – Thiyanesh Feb 09 '21 at 04:26
  • do not think that loom is panacea, not yet, at least. they have (big?) issues with ThreadLocals, at least. Which _partially_ works with `G1GC`, and in a "weird" way, at the moment. as far as I see it, it is far from reality yet. – Eugene Feb 09 '21 at 05:11
  • @Eugene Agreed. Thus my use of the word "experimental". As the dictionary says: *(of a new invention or product) based on untested ideas or techniques and not yet established or finalized* – Basil Bourque Feb 09 '21 at 05:17