8

I have a single writer thread and single reader thread to update and process a pool of arrays(references stored in map). The ratio of writes to read is almost 5:1(latency of writes is a concern).

The writer thread needs to update few elements of an array in the pool based on some events. The entire write operation(all elements) needs to be atomic.

I want to ensure that reader thread reads the previous updated array if writer thread is updating it(something like volatile but on entire array rather than individual fields). Basically, I can afford to read stale values but not block.

Also, since the writes are so frequent, it would be really expensive to create new objects or lock the entire array while read/write.

Is there a more efficient data structure that could be used or use cheaper locks ?

trequartista
  • 167
  • 10

8 Answers8

2

How about this idea: The writer thread does not mutate the array. It simply queues the updates.

The reader thread, whenever it enters a read session that requires a stable snapshot of the array, applies the queued updates to the array, then reads the array.

class Update
{
    int position;
    Object value;
}

ArrayBlockingQueue<Update> updates = new ArrayBlockingQueue<>(Integer.MAX_VALUE);

void write()
{
    updates.put(new Update(...));
}

Object[] read()
{
    Update update;
    while((update=updates.poll())!=null)
        array[update.position] = update.value;

    return array;
}
ZhongYu
  • 19,446
  • 5
  • 33
  • 61
  • 1
    @vemv good point. `read()` can call `updates.toArray()` to handle only limited updates. but if writer is so fast that reader cannot keep up, we'll need to throttle on the write side. – ZhongYu Mar 16 '13 at 02:36
2

Another idea, given that the array contains only 20 doubles.

Have two arrays, one for write, one for read.

Reader locks the read array during read.

read()
    lock();
    read stuff
    unlock();

Writer first modifies the write array, then tryLock the read array, if locking fails, fine, write() returns; if locking succeeds, copy the write array to the read array, then release the lock.

write()
    update write array
    if tryLock()
        copy write array to read array
        unlock()

Reader can be blocked, but only for the time it takes to copy the 20 doubles, which is short.

Reader should use spin lock, like do{}while(tryLock()==false); to avoid being suspended.

ZhongYu
  • 19,446
  • 5
  • 33
  • 61
  • My only complaint is that if the reader is going to lock for a lot of time, the writer will be busy-waiting instead of updating the "write array" in response to possible "events" (as described in the question). Thus, figuring out an optimal strategy for the writer is not straightforward. – deprecated Mar 16 '13 at 03:40
  • @vemv writer never blocks. if it cannot lock the read array to update it, just forget about it. – ZhongYu Mar 16 '13 at 04:16
  • If `tryLock` fails and since then the writer doesn't receive new events to write, the reader will never be able to observe the latest write. – deprecated Mar 16 '13 at 04:28
  • true. though the assumption is that write is very busy. – ZhongYu Mar 16 '13 at 04:42
  • True, too :) but anyway, my second answer (which is inspired by yours) http://stackoverflow.com/a/15445130/569050 is mutex-free, non-blocking, and thread-safe. There's only the cost of copying and the cost of duplicating the amount of arrays (read versions + write versions) – deprecated Mar 16 '13 at 05:00
2

Is there a more efficient data structure?

Yes, absolutely! They're called persistent data structures. They are able to represent a new version of a vector/map/etc merely by storing the differences with respect to a previous version. All versions are immutable, which makes them appropiate for concurrency (writers don't interfere/block readers, and vice versa).

In order to express change, one stores references to a persistent data structure in a reference type such as AtomicReference, and changes what those references point to - not the structures themselves.

Clojure provides a top-notch implementation of persistent data structures. They're written in pure, efficient Java.

The following program exposes how one would approach your described problem using persistent data structures.

import clojure.lang.IPersistentVector;
import clojure.lang.PersistentVector;

public class AtomicArrayUpdates {

    public static Map<Integer, AtomicReference<IPersistentVector>> pool
        = new HashMap<>();
    public static Random rnd = new Random();
    public static final int SIZE = 60000;
    // For simulating the reads/writes ratio
    public static final int SLEEP_TIMÉ = 5;

    static {        
        for (int i = 0; i < SIZE; i++) {
            pool.put(i, new AtomicReference(PersistentVector.EMPTY));
        }
    }

    public static class Writer implements Runnable {   
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ);
                } catch (InterruptedException e) {}

                int index = rnd.nextInt(SIZE);
                IPersistentVector vec = pool.get(index).get();

                // note how we repeatedly assign vec to a new value
                // cons() means "append a value".
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 
                // assocN(): "update" at index 0
                vec = vec.assocN(0, 42); 
                // appended values are nonsense, just an example!
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 

                pool.get(index).set(vec);

            }
        }
    }

    public static class Reader implements Runnable {
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ * 5);
                } catch (InterruptedException e) {}

                IPersistentVector vec = pool.get(rnd.nextInt(SIZE)).get();
                // Now you can do whatever you want with vec.
                // nothing can mutate it, and reading it doesn't block writers!
            }
        } 
    }

    public static void main(String[] args) {
        new Thread(new Writer()).start();
        new Thread(new Reader()).start();
    }
}
deprecated
  • 5,142
  • 3
  • 41
  • 62
1

I would do as follows:

  • synchronize the whole thing and see if the performance is good enough. Considering you only have one writer thread and one reader thread, contention will be low and this could work well enough

    private final Map<Key, double[]> map = new HashMap<> ();
    
    public synchronized void write(Key key, double value, int index) {
        double[] array = map.get(key);
        array[index] = value;
    }
    
    public synchronized double[] read(Key key) {
        return map.get(key);
    }
    
  • if it is too slow, I would have the writer make a copy of the array, change some values and put the new array back to the map. Note that array copies are very fast - typically, a 20 items array would most likely take less than 100 nanoseconds

    //If all the keys and arrays are constructed before the writer/reader threads 
    //start, no need for a ConcurrentMap - otherwise use a ConcurrentMap
    private final Map<Key, AtomicReference<double[]>> map = new HashMap<> ();
    
    public void write(Key key, double value, int index) {
        AtomicReference<double[]> ref = map.get(key);
        double[] oldArray = ref.get();
        double[] newArray = oldArray.clone();
        newArray[index] = value;
        //you might want to check the return value to see if it worked
        //or you might just skip the update if another writes was performed
        //in the meantime
        ref.compareAndSet(oldArray, newArray);
    }
    
    public double[] read(Key key) {
        return map.get(key).get(); //check for null
    }
    

since the writes are so frequent, it would be really expensive to create new objects or lock the entire array while read/write.

How frequent? Unless there are hundreds of them every millisecond you should be fine.

Also note that:

  • object creation is fairly cheap in Java (think around 10 CPU cycles = a few nanoseconds)
  • garbage collection of short lived object is generally free (as long as the object stays in the young generation, if it is unreachable it is not visited by the GC)
  • whereas long lived objects have a GC performance impact because they need to be copied across to the old generation
Community
  • 1
  • 1
assylias
  • 321,522
  • 82
  • 660
  • 783
  • 1
    A minor point about short-lived objects being free to allocate: While they do not increase the cost of a garbage collection run, they do increase their frequency, and thus increase overall garbage collection overhead. Of course, that overhead is so small it's very unlikely to matter. – meriton Mar 16 '13 at 14:05
0

The following variation is inspired by both my previous answer and one of zhong.j.yu's.

Writers don't interfere/block readers and vice versa, and there are no thread safety/visibility issues, or delicate reasoning going on.

public class V2 {

    static Map<Integer, AtomicReference<Double[]>> commited = new HashMap<>();
    static Random rnd = new Random();

    static class Writer {
        private Map<Integer, Double[]> writeable = new HashMap<>();
        void write() {        
            int i = rnd.nextInt(writeable.size());   
            // manipulate writeable.get(i)...
            commited.get(i).set(writeable.get(i).clone());
        }
    }

    static class Reader{
        void read() {
            double[] arr = commited.get(rnd.nextInt(commited.size())).get();
            // do something useful with arr...
        } 
    }

}
Community
  • 1
  • 1
deprecated
  • 5,142
  • 3
  • 41
  • 62
0
  • You need two static references: readArray and writeArray and a simple mutex to track when write has been changed.

  • have a locked function called changeWriteArray make changes to a deepCopy of writeArray:

    synchronized String[] changeWriteArray(String[] writeArrayCopy, other params go here){ // here make changes to deepCopy of writeArray

          //then return deepCopy
          return writeArrayCopy;
    }
    
  • Notice that changeWriteArray is functional programming with effectively no side effect since it is returning a copy that is neither readArray nor writeArray.

  • whoever calles changeWriteArray must call it as writeArray = changeWriteArray(writeArray.deepCopy()).

  • the mutex is changed by both changeWriteArray and updateReadArray but is only checked by updateReadArray. If the mutex is set, updateReadArray will simply point the reference of readArray to the actual block of writeArray

EDIT:

@vemv concerning the answer you mentioned. While the ideas are the same, the difference is significant: the two static references are static so that no time is spent actually copying the changes into the readArray; rather the pointer of readArray is moved to point to writeArray. Effectively we are swapping by means of a tmp array that changeWriteArray generates as necessary. Also the locking here is minimal as reading does not require locking in the sense that you can have more than one reader at any given time.

In fact, with this approach, you can keep a count of concurrent readers and check the counter to be zero for when to update readArray with writeArray; again, furthering that reading requires no lock at all.

kasavbere
  • 5,873
  • 14
  • 49
  • 72
0

Improving on @zhong.j.yu's answer, it is really a good idea to queue the writes instead of trying to perform them when they occur. However, we must tackle the problem when updates are coming so fast that the reader would choke on updates continuously coming in. My idea is what if the reades only performs the writes that were queued before the read, and ignoring subsequent writes (those would be tackled by next read).

You will need to write your own synchornised queue. It will be based off a linked list, and would contain only two methods:

public synchronised enqeue(Write write);

This method will atomically enqueue a write. There is a possible deadlock when writes would come faster than it would actually take to enqueue them, but I think there would have to be hundreds of thousands of writes every second to achieve that.

public synchronised Element cut();

This will atomically empty the queue and returns its head (or tail) as the Element object. It will contain a chain of other Elements (Element.next, etc..., just the usual linked list stuff), all those representing a chain of writes since last read. The queue would then be empty, ready to accept new writes. The reader then can trace the Element chain (which will be standalone by then, untouched by subsequent writes), perform the writes, and finally perform the read. While the reader processes the read, new writes would be enqueued in the queue, but those will be next read's problem.

I wrote this once, albeit in C++, to represent a sound data buffer. There were more writes (driver sends more data), than reads (some mathematical stuff over the data), while the writes had to finish as soon as possible. (The data came in real-time, so I needed to save them before next batch was ready in the driver.)

Jakub Zaverka
  • 8,816
  • 3
  • 32
  • 48
0

I've got a funny solution using three arrays and a volatile boolean toggle. Basically, both threads have its own array. Additionally, there's a shared array controlled via the toggle.

When the writer finishes and the toggle allows it, it copies the newly written array into the shared array and flips the toggle.

Similarly, before the reader starts, when the toggle allows it, it copies the shared array into its own array and flips the toggle.

public class MolecularArray {
    private final double[] writeArray;
    private final double[] sharedArray;
    private final double[] readArray;

    private volatile boolean writerOwnsShared;

    MolecularArray(int length) {
        writeArray = new double[length];
        sharedArray = new double[length];
        readArray = new double[length];
    }

    void read(Consumer<double[]> reader) {
        if (!writerOwnsShared) {
            copyFromTo(sharedArray, readArray);
            writerOwnsShared = true;
        }
        reader.accept(readArray);
    }

    void write(Consumer<double[]> writer) {
        writer.accept(writeArray);
        if (writerOwnsShared) {
            copyFromTo(writeArray, sharedArray);
            writerOwnsShared = false;
        }
    }

    private void copyFromTo(double[] from, double[] to) {
        System.arraycopy(from, 0, to, 0, from.length);
    }
}
  • It depends on the "single writer thread and single reader" assumption.
  • It never blocks.
  • It uses a constant (albeit huge) amount of memory.
  • Repeated calls to read without any intervening write do no copying and vice versa.
  • The reader does not necessarily see the most recent data, but it sees the data from the first write started after the previous read, if any.

I guess, this could be improved using two shared arrays.

maaartinus
  • 44,714
  • 32
  • 161
  • 320