2

Let's say I have an instance of ExecutorService from one of Executors static factory methods.

If I submit a Callable where RetVal is not a thread-safe, locally instantiated object from some thread, do I need to worry about RetVals' integrity when I get() it from the same thread? People say that local variables are thread-safe, but I am not sure if it applies when you're returning a locally instantiated Object and receiving it from some other thread.

Here's an example similar to my situation:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

In the above example, I'm retrieving an ArrayList that was created in a different thread--one created by executor. I don't think above code is thread safe but I was not able to find much information regarding my specific situation.

Now I tried the above code on my computer and it actually returned the expected result 100% of the time I tried it, and I even tried with my own implementation of an ExecutorService and so far I have only got the expected results. So unless I have gotten extremely lucky I am pretty sure it works but I'm not sure how. I created a not thread-safe object in another thread and received it in another; shouldn't I have a chance to have received a partially constructed object--in my case a list that does not contain 2 strings?

Below is my custom implementation I made just to test. You can ignore the EType enum thingy.

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}
msl
  • 21
  • 1
  • 2
    Local variables are absolutely "thread-safe" in Java, but don't conflate the variables with the objects to which they refer. Any time you have threads talking to each other, then it's quite likely that you'll have local variables in two or more different threads that all refer to the same object. – Solomon Slow Jan 08 '20 at 14:46
  • @SolomonSlow Well the variable that I've created in my task certainly is local but that variable is passed to the calling thread in the end from the tasking thread without synchronization in my case--not sure if this is the case with Java's own ExecutorService implementations, but in mine it certainly is--so even though the tasking thread modify the variable after passing it to the calling thread, I'm curious as to what is stopping the JVM from reordering the instructions so that some numbers are added in the tasking thread after the calling thread receives it since I didn't synchronize. – msl Jan 08 '20 at 14:54
  • The be all, end all is the _happens-before_ relationship. So long as such a relationship is created, which heavily depends on context, the code is thread-safe. There are certain actions which are guaranteed to _happen-before_ other actions; at least some are documented in the [`java.util.concurrent`](https://docs.oracle.com/en/java/javase/13/docs/api/java.base/java/util/concurrent/package-summary.html) package Javadoc, in the "Memory Consistency Properties" section. – Slaw Jan 08 '20 at 15:01
  • @Slaw In my implementation, the only happens-before relationship I created is with the volatile outcome and atomicState, but they should only guarantee the happens-before relationship with their own. – msl Jan 08 '20 at 15:03
  • Re "The variable is passed..." That's exactly what I'm talking about. The thing that you passed is not a variable. When somebody told you that local variables are thread safe, they were telling you the truth, but they were not talking about the thing that you passed from one thread to another. The thing that you passed was a _reference_ to an object on the heap. When you have two or more threads that both have the same reference, then you have the possibility of a collision when the threads access the object to which it refers. – Solomon Slow Jan 08 '20 at 15:05
  • @SolomonSlow So are you saying that my code has a possibility to not produce the expected result? Because that's something I actually want to hear. – msl Jan 08 '20 at 15:08
  • Writes to a volatile field _happen-before_ subsequent reads of the same volatile field. The `compareAndSet` method [has volatile memory semantics](https://docs.oracle.com/en/java/javase/13/docs/api/java.base/java/util/concurrent/atomic/AtomicInteger.html#compareAndSet(int,int)). Since you set the `outcome` before setting the state to "done", if a thread reads the state as "done" then it will see the current value of `outcome`. – Slaw Jan 08 '20 at 15:20
  • In fact, I'm not sure `outcome` actually needs to be volatile in this case (as long as that `set(T)` method is never used). – Slaw Jan 08 '20 at 15:25
  • @Slaw Without volatile on `outcome`, wouldn't there be visibility issue for `outcome` to threads calling get()? And even with volatile, calling threads may see the current pointer value that `outcome` points to, I'm not sure if it guarantees the underlying object's synchronization. I might have to read up on "volatile" a bit more though. – msl Jan 08 '20 at 15:38
  • The documentation says: "_A write to a volatile field happens-before every subsequent read of that same field. Writes and reads of volatile fields have similar memory consistency effects as entering and exiting monitors, but do not entail mutual exclusion locking_". I understand that to mean actions prior to writing the volatile field will also be visible to threads subsequently reading the volatile field. Which means writing to `atomicState` guarantees the write to `outcome` _happens-before_ subsequent reads of `atomicState`. – Slaw Jan 08 '20 at 15:48
  • Since `outcome` is written to _before_ the state is set to "done", and another thread will only ever read `outcome` _after_ the state is set to "done", and since setting the state has volatile semantics, the visibility of `outcome` in the `get()` method should not be an issue. This seems to support my understanding: https://stackoverflow.com/questions/1351168/volatile-semantic-with-respect-to-other-fields – Slaw Jan 08 '20 at 15:51
  • Re, "are you saying my code has the possibility to...?" No. I was not trying to answer your question. Others here already have done that. I was trying to ensure that you understand the difference between variables, objects/instances, and object/instance references. Experienced programmers sometimes use those words interchangeably when the difference between them is not important. But sometimes the difference _is_ important (e.g., when you're analyzing the thread-safety of some snippet of code.) – Solomon Slow Jan 08 '20 at 16:13

2 Answers2

4

The important thing is the happens-before relationship. From ExecutorService API docs:

Memory consistency effects: Actions in a thread prior to the submission of a Runnable or Callable task to an ExecutorService happen-before any actions taken by that task, which in turn happen-before the result is retrieved via Future.get().

So you are safe to transfer a mutable object like this. The ExecutorService implementation transfers the object via some form of safe publication.

Obviously, don't update the object in the original thread after returning it.

If you were to communicate between threads by stashing in a shared non-volatile field, then that would be unsafe.

Tom Hawtin - tackline
  • 145,806
  • 30
  • 211
  • 305
  • Well the thing is, I actually implemented a very rough Executor myself which I posted and in it the result of my task is saved in a volatile field--outcome--which for my task isn't safely constructed because I added values to a non-threadsafe ArrayList after creating it, so I'm not sure how there can be a happens-before guarantee between my executing the task and the get() call. – msl Jan 08 '20 at 14:36
  • @msl I don't think reimplementing an executor helps. I've looked through the code, it seems to be implemented in terms of `volatile` and libraries that provide *happens-before*. But it is very rough - busy wait, sleep, `Thread.interrupted` instead of `Thread.interrupt`, `tasksQ` should be `final` - so many issues. – Tom Hawtin - tackline Jan 08 '20 at 15:09
  • Well my implementation was not meant to be perfect and I actually wanted it to produce unexpected results but in my tests it didn't. But I don't think what you mentioned in the above comment should result in error except for maybe "volatile"? Thread.interrupted calls are actually irrelevant because I never used them in my tests and taskQ doesn't need to be final for MyRunnable to run it because it was created before the its enclosing thread was even started which insures its visibility. – msl Jan 08 '20 at 15:16
  • The thing about "volatile" is though I was going to try using "holder classes" that basically stores the results in its only final field to guarantee visibility of all the fields of my result, but I actually never bothered because the tests didn't fail without it. – msl Jan 08 '20 at 15:18
2

Thread safety becomes a concern when multiple threads try to access and modify the same state simultaneously.

Note that you will not get hold of the actual result from a Future until the task is finished (i.e. Future#get will not return until the task is finished).

In your first example, thread safety is not an issue because the a new object (while mutable) is created by one thread (the thread created by the Executor) and retrieved from the Future object once that thread has finished processing the task. Once the calling thread gets hold of the object, it cannot be modified by any other thread, because the creating thread no longer has access to the List.

marthursson
  • 3,242
  • 1
  • 18
  • 28
  • But is it not possible for the JVM, to say add the last number to the result list after the calling thread has received the result via get because I didn't try to synchronize any of this? Or does a function return--in this case my task callback--have some special property that prevents this from happening? – msl Jan 08 '20 at 14:39
  • The key thing to understand is that the calling thread will not get a reference to the list until the task is finished (i.e. the Callable supplied to the Executor has returned). Execution of the main thread will be suspended on Future#get until the task is finished, at which time the list has been returned by the Callable and is made available to the calling thread. This is guaranteed. – marthursson Jan 09 '20 at 07:58