I'm writing an audio DSP application and I've opted to use a producer-consumer model. I've been reading a lot about volatile
and other threading issues, but I've got a couple of questions about some specifics of my case - particularly, one of the things I need to be shared between threads is an array of arrays.
I have a class which represents the producer. To allow for variation in processing time the producer stores n
buffers, which it will fill in rotation every time more audio data is available and pass the buffer on to the consumer thread.
I'll start with my questions, and then I'll try to explain my system in enough detail - sorry for the long post, thanks for bearing with me! I'd also be very grateful for general comments about my implementation and its thread safety.
My buffers are represented by a volatile byte[][]
array. I am well aware that the volatile
only makes the reference volatile, but having read around SO and various blog posts, it seems that I have two options:
I could use AtomicIntegerArray
. But:
Will I compromise performance for an application like this?
Is atomicity even what I need? I intend to write to the entire array in one go, then I need it to be visible to another thread, I don't need each individual write to be atomic or visible immediately.
If I understand correctly (e.g. this blog post), a self assignment, which in my case is: buffers[currentBuffer] = buffers[currentBuffer]
will ensure publication, which you will see in my code below.
Is this correct, that it will cause all recent writes to become visible?
Does this work in the case of a 2D array like this?
I'll try to overview the producer class briefly; these are the instance variables:
// The consumer - just an interface with a process(byte[]) method
AudioInputConsumer consumer;
// The audio data source
AudioSource source;
// The number of buffers
int bufferCount;
// Controls the main producer loop
volatile boolean isRunning = false;
// The actual buffers
volatile byte[][] buffers;
// The number of buffers left to process.
// Shared counter - the producer inrements and checks it has not run
// out of buffers, while the consumer decremenets when it processes a buffer
AtomicInteger buffersToProcess = new AtomicInteger(0);
// The producer thread.
Thread producerThread;
// The consumer thread.
Thread consumerThread;
Once I start the producerThread
and consumerThread
, they just execute the methods producerLoop
and consumerLoop
respectively.
producerLoop
blocks while waiting for audio data, reads into the buffer, performs a self-assignment on the buffer, then uses the AtomicInteger
instance to signal to the consumer loop.
private void producerLoop() {
int bufferSize = source.getBufferSize();
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() == bufferCount) {
//This thread must be faster than the processing thread, we have run out
// of buffers: decide what to do
System.err.println("WARNING: run out of buffers");
}
source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer
buffers[currentBuffer] = buffers[currentBuffer]; // Self-assignment to force publication (?)
buffersToProcess.incrementAndGet(); // Signal to the other thread that there is data to read
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
}
consumerLoop
waits until the AtomicInteger
buffersToProcess
is greater than zero, then calls the consumer object to do whatever it wants with the data. Afterwards buffersToProcess
is decremented and we wait for it to become nonzero again.
private void consumerLoop() {
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() > 0) {
consumer.process(buffers[currentBuffer]); // Process the data
buffersToProcess.decrementAndGet(); // Signal that we are done with this buffer
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
Thread.yield();
}
}
Many thanks!