11

I have a Java application which has worker threads to process jobs. A worker produces a result object, say something like:

class WorkerResult{
    private final Set<ResultItems> items;
    public Worker(Set<ResultItems> pItems){
         items = pItems;
    }
}

When the worker finishes, it does this operation:

 ...
 final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>();
 for(Item producedItem : ...){
      items.add(item);
 }
 passToGatherThread(items);

The items set is kind of a "unit-of-work" here. The passToGatherThread method passes the items set to a gather thread, of which only one exists at runtime.

Synchronization is not needed here, since race conditions cannot occur because only one thread (Gather-thread) reads the items set. AFAICS, the Gather-thread may not see all items because the set is not thread-safe, right?

Assume I cannot make passToGatherThread synchronized, say because it is a 3rd party library. What I basically fear is that the gather thread does not see all items because of caching, VM optimizations, etc. So here comes the question: How to pass the items set in a thread-safe manner, such that the Gather thread "sees" the proper set of items?

Duncan Jones
  • 67,400
  • 29
  • 193
  • 254
AlexLiesenfeld
  • 2,872
  • 7
  • 36
  • 57
  • I am not sure but perhaps [PipedStreams](http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html) can help you? – oliholz Feb 11 '13 at 12:51
  • 1
    What is the definition of `passToGatherthread`? I think this is crucial for understanding whether the answers given below are correct. How exactly are `items` passed to the gather thread? – Duncan Jones Feb 11 '13 at 14:15
  • Do you really have problems with the objects' visibility or do you only suspect such behavior? Your concerns seem pretty far-fetched in those circumstances. – Dariusz Feb 11 '13 at 14:24
  • 1
    I suspect such behaviour ... it's not that I have experienced it so far, but it does not mean that there will not be such behaviour. I wanted to be sure, that's why I am asking. – AlexLiesenfeld Feb 11 '13 at 14:54
  • 2
    Concurrency bugs are really hard to detect. Just because xSNRG did not experience them so far does not mean that there are none. Things might be completely different on a different VM or machine with a different set of cores. – joergl Feb 11 '13 at 16:13

4 Answers4

2

There seems to be no synchronization issue here. You create a new Set object for each passToGatherThread and do it after modifying the set. No objects will be lost.

Set (and most Java collections) can be accessed concurrently by many threads provided that no modification to the collection is made. That's what Collections.unmodifiableCollection is for.

Since the mentioned passToGatherThread method serves as a communication with other thread, it must use some kind of synchronization -- and each synchronization ensures memory consistency between threads.

Also - please note, that all writes to the objects in the passed collection are made before it is passed to the other thread. Even if the memory is copied into thread's local cache, it has the same unmodified value as in the other thread.

Dariusz
  • 21,561
  • 9
  • 74
  • 114
  • 1
    AFAIK the JVM is allowed to cache data whenever possible in the processor core registers. When writig data without "flushing" out (using synchronized/volatile/etc.) other threads may see "stale" values or even not at all since "happened-before" semantics are only valid in the thread producing the data. – AlexLiesenfeld Feb 11 '13 at 13:11
  • @xSNRG added a paragraph. Provided the pass method is valid, there should be nothing to worry about. – Dariusz Feb 11 '13 at 13:19
  • Moreover - if this didn't work, a hell of a lot of applications would not work. – Dariusz Feb 11 '13 at 13:20
  • what I fear is described here more precisely under the keyworkd "stale". https://www.securecoding.cert.org/confluence/display/java/Concurrency,+Visibility,+and+Memory – AlexLiesenfeld Feb 11 '13 at 13:22
  • @xSNRG But doesn't that just affect shared variables (fields, and the like)? Surely passing the object as a parameter to a method is exempt from this? – Duncan Jones Feb 11 '13 at 13:23
  • 1
    The Set stores the passed values as private member variables which are not stored thread-safely, because thread A stores them and threads B reads them , without synchronization and any further visibility issue avoiding mechanics. – AlexLiesenfeld Feb 11 '13 at 13:25
  • @Dariusz Wawer: The problem is that xSNRG says he cannot make `passToGatherThread` synchronized. – joergl Feb 11 '13 at 13:25
  • But the `passToGatherThread` has to be synchronized internally or the software is broken. If that's the case, you wouldn't want to use it anyway. Maybe you can get the sources and check it? – Dariusz Feb 11 '13 at 13:26
  • @Dariusz Wawer: You said: "Also - please note, that all writes to the objects in the passed collection are made before it is passed to the other thread" ... how did you come to that conclusion? The VM might reschedule operations as it sees it fit unless it does not break the thread local "happened-before" semantics (http://en.wikipedia.org/wiki/Happened-before). Do I miss something? – AlexLiesenfeld Feb 12 '13 at 09:47
  • @xSNRG Passing the set to `passToGatherThread` method will not happen before filling the set. Imagine that there is one thread only and the set is passed to a different method in the same class - do you think that the function call could occur before the set is filled? Because that's what you're implying. – Dariusz Feb 12 '13 at 13:17
1

You could simply use one of the thread-safe implementations of Set that Java provides for your WorkerResult. See for example:

Another option is to use Collections.synchronizedSet().

joergl
  • 2,850
  • 4
  • 36
  • 42
  • Why is this needed? What is wrong with using a non-thread-safe `Set` implementation? – Duncan Jones Feb 11 '13 at 13:10
  • I think the OP is afraid of caching. Threads may cache non-`volatile` data and if writes on it are not synchronized never see an update. – joergl Feb 11 '13 at 13:15
  • 1
    Yes joergl is absolutely right. See my comment below the answer of Dariusz Wawer. – AlexLiesenfeld Feb 11 '13 at 13:18
  • In the light of everything that xSNRG has written here, unmodifiableSet will not solve anything - it provides no synchronization wahtsoever. – Dariusz Feb 11 '13 at 15:30
  • @Dariusz Wawer: You are right, thanks! `unmodifiableSet` doesn't make guarantees concerning synchronization and therefore also visibility. Modifications to the original set would maybe not be visible in the immutable copy. I adjusted my answer. – joergl Feb 11 '13 at 16:10
1

I have thought about (and discussed) this question a lot and I have come up with another answer, which, I hope, will be the best solution.

Passing a synchronized collection is not good in terms of efficiency, because each subsequent operation on that collection will be synchronized - if there are many operations, it may prove to be a handicap.

To the point: let's make some assumptions (which I do not agree with):

  • the mentioned passToGatherThread method is indeed unsafe, however improbable it seems
  • compiler can reorder the events in the code so that the passToGatherThread is called before the collection is filled

The simplest, cleanest and possibly the most efficient way to ensure that the collection passed to gatherer method is ready and complete is to put the collection push in a synchronized block, like this:

synchronized(items) {
  passToGatherThread(items);
}

That way we ensure a memory synchronization and a valid happen-before sequence before the collection is passed, thus making sure that all objects are passed correctly.

Dariusz
  • 21,561
  • 9
  • 74
  • 114
  • 1
    I think you are right, and, in the end this is exactly how I encountered this problem. It looks a little ugly doesn't it? Synchronizing on a newly created set, which to most people simply looks silly ... ;-) Thank you for your answer. – AlexLiesenfeld Feb 13 '13 at 20:09
0

The worker implements callable and returns WorkerResult:

class Worker implements Callable<WorkerResult> {
    private WorkerInput in;

    public Worker(WorkerInput in) {
        this.in = in;
    }

    public WorkerResult call() {
        // do work here
    }
}

Then we use an ExecutorService to manage the thread pool, and collect the results via using Future.

public class PooledWorkerController {

    private static final int MAX_THREAD_POOL = 3;
    private final ExecutorService pool = 
       Executors.newFixedThreadPool(MAX_THREAD_POOL);

    public Set<ResultItems> process(List<WorkerInput> inputs) 
           throws InterruptedException, ExecutionException{         
        List<Future<WorkerResult>> submitted = new ArrayList<>();
        for (WorkerInput in : inputs) {
            Future<WorkerResult> future = pool.submit(new Worker(in));
            submitted.add(future);
        }
        Set<ResultItems> results = new HashSet<>();
        for (Future<WorkerResult> future : submitted) {
            results.addAll(future.get().getItems());
        }
        return results;
    }
}
Mark Butler
  • 4,361
  • 2
  • 39
  • 39
  • So you create a thread each time a job is started. -1 for that. – Dariusz Feb 12 '13 at 07:30
  • in your post you introduced a bad practice of creating thread each time a pack of jobs is called. Thread creation is very expensive and whenever possible threads should be created once and reused. Your code would be IMO much, much better if you had a single statically initialized thread pool which would be used in `process`. Creating a new thread pool for each `process` call does not change much. – Dariusz Feb 12 '13 at 11:08