0

I'm writing an audio DSP application and I've opted to use a producer-consumer model. I've been reading a lot about volatile and other threading issues, but I've got a couple of questions about some specifics of my case - particularly, one of the things I need to be shared between threads is an array of arrays.

I have a class which represents the producer. To allow for variation in processing time the producer stores n buffers, which it will fill in rotation every time more audio data is available and pass the buffer on to the consumer thread.

I'll start with my questions, and then I'll try to explain my system in enough detail - sorry for the long post, thanks for bearing with me! I'd also be very grateful for general comments about my implementation and its thread safety.

My buffers are represented by a volatile byte[][] array. I am well aware that the volatile only makes the reference volatile, but having read around SO and various blog posts, it seems that I have two options:

I could use AtomicIntegerArray. But:

  • Will I compromise performance for an application like this?

  • Is atomicity even what I need? I intend to write to the entire array in one go, then I need it to be visible to another thread, I don't need each individual write to be atomic or visible immediately.

If I understand correctly (e.g. this blog post), a self assignment, which in my case is: buffers[currentBuffer] = buffers[currentBuffer] will ensure publication, which you will see in my code below.

  • Is this correct, that it will cause all recent writes to become visible?

  • Does this work in the case of a 2D array like this?


I'll try to overview the producer class briefly; these are the instance variables:

// The consumer - just an interface with a process(byte[]) method
AudioInputConsumer consumer;

// The audio data source
AudioSource source;

// The number of buffers
int bufferCount;

// Controls the main producer loop
volatile boolean isRunning = false;

// The actual buffers
volatile byte[][] buffers;

// The number of buffers left to process.
// Shared counter - the producer inrements and checks it has not run
// out of buffers, while the consumer decremenets when it processes a buffer
AtomicInteger buffersToProcess = new AtomicInteger(0);

// The producer thread.
Thread producerThread;

// The consumer thread.
Thread consumerThread;

Once I start the producerThread and consumerThread, they just execute the methods producerLoop and consumerLoop respectively.

producerLoop blocks while waiting for audio data, reads into the buffer, performs a self-assignment on the buffer, then uses the AtomicInteger instance to signal to the consumer loop.

private void producerLoop() {
  int bufferSize = source.getBufferSize();
  int currentBuffer = 0;

  while (isRunning) {
    if (buffersToProcess.get() == bufferCount) {
      //This thread must be faster than the processing thread, we have run out
      // of buffers: decide what to do
      System.err.println("WARNING: run out of buffers");
    }

    source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer
    buffers[currentBuffer] = buffers[currentBuffer];    // Self-assignment to force publication (?)
    buffersToProcess.incrementAndGet();                 // Signal to the other thread that there is data to read
    currentBuffer = (currentBuffer + 1) % bufferCount;  // Next buffer
  }
}

consumerLoop waits until the AtomicInteger buffersToProcess is greater than zero, then calls the consumer object to do whatever it wants with the data. Afterwards buffersToProcess is decremented and we wait for it to become nonzero again.

private void consumerLoop() {
  int currentBuffer = 0;

  while (isRunning) {
    if (buffersToProcess.get() > 0) {
      consumer.process(buffers[currentBuffer]);          // Process the data
      buffersToProcess.decrementAndGet();                // Signal that we are done with this buffer
      currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
    }
    Thread.yield();
  }
}

Many thanks!

devrobf
  • 6,973
  • 2
  • 32
  • 46

3 Answers3

2

You do need atomicity because writing to an array is a non-atomic process. Specifically, Java will certainly never guarantee that writes to array members will stay invisible to other threads until you choose to publish them.

One option is to create a new array each time, fully initialize it, and then publish over a volatile, but this may incur a significant cost due to Java's insistence that a newly allocated array must be first zeroed out, and due to GC overhead. You may overcome this with a "double buffering" scheme where you keep just two arrays and switch between them. This approach has its dangers: a thread may still be reading from the array which your writing thread has already marked as the inactive one. This highly depends on the precise details of your code.

The only other option is to do the whole reading and writing in a classic, boring, synchronized block. This has the advantage of being very predictable in latency. Personally, I'd start from this and move on to anything more complicated if absolutely pressed by an actual performance issue.

You could also lock using read-write locks, but this would only pay off if multiple threads read the array concurrently. This does not seem to be your case.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • Thanks for your answer, I will give it some thought based on this. Just one clarification - you mention a double buffering scheme, which is essentially what I am doing, although I have a variable number of buffers. However I am trying to "force" publication using a self-assignment `buffers[i]=buffers[i]`, where `buffers` is type `byte[][]`. Does think work? Or should the self-assignment be `buffers=buffers`, since that is the volatile reference? – devrobf Aug 22 '13 at 12:36
  • No, `array[i]=array[i]` is basically a no-op. You can't make individual array members volatile. `buffers=buffers` should work. Of course, only if on the reading side you also read the reference to the outer array. – Marko Topolnik Aug 22 '13 at 12:38
  • Thanks a lot! Sorry, just one more: "only if on the reading side you also read the reference to the outer array"; I am reading from the exact same reference variable `buffers` if that's what you mean? I assume the alternative you were warning against is if I had some other reference to the same array on the reading side. – devrobf Aug 22 '13 at 12:44
  • A bad example would be if, at the reading end, you had `buffers` already written to some other variable, and you didn't touch the one and the same `buffers` variable at both ends. – Marko Topolnik Aug 22 '13 at 12:46
  • Yup, that's what I thought. Thanks again for your help. – devrobf Aug 22 '13 at 12:48
  • While we're at it, you must be careful to read from the volatile variable *only once* per publishing event. For example, don't read it every time in a loop: if you use `buffers[i][j]`, then the volatile `buffers` will be read each and every time. That would be both slow and introduce a race condition. – Marko Topolnik Aug 22 '13 at 12:48
  • And definitely stay away from anything else than primitive array elements if performance is dear to you :) You are right that using a container which works only for reference types would be bad. – Marko Topolnik Aug 22 '13 at 12:52
1

Have a look at java.util.concurrent.ArrayBlockingQueue.

When you call take() on a blocking queue, it will automatically wait until something is made available.

Simply place the byte[] onto the queue and the consumer will process it. The need to know how many buffers are to be processed is irrelevant in this scenario. Usually, a "terminate" item in the queue would represent the last buffer. It would help to wrap the byte[] in a class with a boolean terminate flag.

Regards Yusuf

Yusuf Jakoet
  • 136
  • 3
  • Thanks for this - this could be a better way. I'm marking the other one as correct because it more directly answers my question but I will investigate this as an alternative approach, thanks! – devrobf Aug 22 '13 at 12:45
  • Submitting processing tasks to an executor service is also a great option: you get queueing plus multithreading in one safe package. – Marko Topolnik Aug 22 '13 at 12:54
0

In addition to @Marko Topolnik explained above about how volatile and atomic works, if you still want to achieve the effect for across-thread visibility when writing into array, you can use Unsafe.putLongVolatile(), Unsafe.putObjectVolatile() and all other methods in the same family.

Yes it's not that Java like but it resolves your problem.

Alex Suo
  • 2,977
  • 1
  • 14
  • 22