I have an application in Java 8 Collecting Data of multiple Threads using BlockingQueue
.
I need to perform comparison of samples.
But my application is very large, I implemented a mock application (Github) in Java 8.
I'm generating a chunk of bytes (really is random order).
The bytes are stored into ChunkDTO
class.
I implemented a capturer the ChunkDTO
in a List, code in Capturer
class.
Each ChunkDTO
of List is translated into a List of Samples (TimePitchValue
exactly) returning a nested List (or List of List of TimePitchValue
).
Later the nested List is transposed in order to performs comparisons between TimePitchValue
with the same time value.
Due to enormous volume of TimePitchValue
instances it's consumes huge time in my application.
Here some code (The complete functional Code is in Github) because is still large for this site).
public class Generator {
final static Logger LOGGER = Logger.getLogger("SampleComparator");
public static void main(String[] args) {
long previous = System.nanoTime();
final int minBufferSize = 2048;
int sampleRate = 8192;
int numChannels = 1;
int numBytesPerSample = 1;
int samplesChunkPerSecond = sampleRate / minBufferSize;
int minutes = 0;
int seconds = 10;
int time = 60 * minutes + seconds;
int chunksBySecond = samplesChunkPerSecond * numBytesPerSample * numChannels;
int pitchs = 32;
boolean signed = false;
boolean endianness = false;
AudioFormat audioformat = new AudioFormat(sampleRate, 8 * numBytesPerSample, numChannels, signed, endianness);
ControlDSP controlDSP = new ControlDSP(audioformat);
BlockingQueue<ChunkDTO> generatorBlockingQueue = new LinkedBlockingQueue<>();
Capturer capturer = new Capturer(controlDSP, pitchs, pitchs * time * chunksBySecond, generatorBlockingQueue);
controlDSP.getListFuture().add(controlDSP.getExecutorService().submit(capturer));
for (int i = 0; i < time * chunksBySecond; i++) {
for (int p = 0; p < pitchs; p++) {
ChunkDTO chunkDTO = new ChunkDTO(UtilClass.getArrayByte(minBufferSize), i, p);
LOGGER.info(String.format("chunkDTO: %s", chunkDTO));
try {
generatorBlockingQueue.put(chunkDTO);
} catch (InterruptedException ex) {
LOGGER.info(ex.getMessage());
}
}
try {
Thread.sleep(1000 / chunksBySecond);
} catch (Exception ex) {
}
}
controlDSP.tryFinishThreads(Thread.currentThread());
long current = System.nanoTime();
long interval = TimeUnit.NANOSECONDS.toSeconds(current - previous);
System.out.println("Seconds Interval: " + interval);
}
}
Capturer
Class
public class Capturer implements Callable<Void> {
private final ControlDSP controlDSP;
private final int pitchs;
private final int totalChunks;
private final BlockingQueue<ChunkDTO> capturerBlockingQueue;
private final Counter intCounter;
private final Map<Long, List<ChunkDTO>> mapIndexListChunkDTO = Collections.synchronizedMap(new HashMap<>());
private volatile boolean isRunning = false;
private final String threadName;
private static final Logger LOGGER = Logger.getLogger("SampleComparator");
public Capturer(ControlDSP controlDSP, int pitchs, int totalChunks, BlockingQueue<ChunkDTO> capturerBlockingQueue) {
this.controlDSP = controlDSP;
this.pitchs = pitchs;
this.totalChunks = totalChunks;
this.capturerBlockingQueue = capturerBlockingQueue;
this.intCounter = new Counter();
this.controlDSP.getListFuture().add(this.controlDSP.getExecutorService().submit(() -> {
while (intCounter.getValue() < totalChunks) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
}
capturerBlockingQueue.add(new ChunkDTOStopper());
}));
this.threadName = this.getClass().getSimpleName();
}
@Override
public Void call() throws Exception {
long quantity = 0;
isRunning = true;
while (isRunning) {
try {
ChunkDTO chunkDTO = capturerBlockingQueue.take();
if (chunkDTO instanceof ChunkDTOStopper) {
break;
}
//Find or Create List (according to Index) to add the incoming Chunk
long index = chunkDTO.getIndex();
int sizeChunk = chunkDTO.getChunk().length;
List<ChunkDTO> listChunkDTOWithIndex = getListChunkDTOByIndex(chunkDTO);
//When the List (according to Index) is completed and processed
if (listChunkDTOWithIndex.size() == pitchs) {
mapIndexListChunkDTO.remove(index);
TransposerComparator transposerComparator = new TransposerComparator(controlDSP, controlDSP.getAudioformat(), index, sizeChunk, listChunkDTOWithIndex);
controlDSP.getListFuture().add(controlDSP.getExecutorService().submit(transposerComparator));
}
quantity++;
intCounter.setValue(quantity);
LOGGER.info(String.format("%s\tConsumes:%s\ttotal:%05d", threadName, chunkDTO, quantity));
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, null, ex);
}
}
LOGGER.info(String.format("%s\tReceived:%05d\tQty:%s\tPitchs:%s\tEND\n", threadName, quantity, quantity / pitchs, pitchs));
return null;
}
private List<ChunkDTO> getListChunkDTOByIndex(ChunkDTO chunkDTO) {
List<ChunkDTO> listChunkDTOWithIndex = mapIndexListChunkDTO.get(chunkDTO.getIndex());
if (listChunkDTOWithIndex == null) {
listChunkDTOWithIndex = new ArrayList<>();
mapIndexListChunkDTO.put(chunkDTO.getIndex(), listChunkDTOWithIndex);
listChunkDTOWithIndex = mapIndexListChunkDTO.get(chunkDTO.getIndex());
}
listChunkDTOWithIndex.add(chunkDTO);
return listChunkDTOWithIndex;
}
}
TransposerComparator
class.
The optimization required is in this code, specifically on transposedNestedList
method.
public class TransposerComparator implements Callable<Void> {
private final ControlDSP controlDSP;
private final AudioFormat audioformat;
private final long index;
private final int sizeChunk;
private final List<ChunkDTO> listChunkDTOWithIndex;
private final String threadName;
private static final Logger LOGGER = Logger.getLogger("SampleComparator");
public TransposerComparator(ControlDSP controlDSP, AudioFormat audioformat, long index, int sizeChunk, List<ChunkDTO> listChunkDTOWithIndex) {
this.controlDSP = controlDSP;
this.audioformat = audioformat;
this.index = index;
this.sizeChunk = sizeChunk;
this.listChunkDTOWithIndex = listChunkDTOWithIndex;
this.threadName = this.getClass().getSimpleName() + "_" + String.format("%05d", index);
}
@Override
public Void call() throws Exception {
Thread.currentThread().setName(threadName);
LOGGER.info(String.format("%s\tINI", threadName));
try {
int numBytesPerSample = audioformat.getSampleSizeInBits() / 8;
int quantitySamples = sizeChunk / numBytesPerSample;
long baseTime = quantitySamples * index;
// Convert the List of Chunk Bytes to Nested List of TimePitchValue
List<List<TimePitchValue>> nestedListTimePitchValue = listChunkDTOWithIndex
.stream()
.map(chunkDTO -> {
return IntStream
.range(0, quantitySamples)
.mapToObj(time -> {
int value = extractValue(chunkDTO.getChunk(), numBytesPerSample, time);
return new TimePitchValue(chunkDTO.getPitch(), baseTime + time, value);
}).collect(Collectors.toList());
}).collect(Collectors.toList());
List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue);
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.log(Level.SEVERE, null, ex);
throw ex;
}
return null;
}
private static int extractValue(byte[] bytesSamples, int numBytesPerSample, int time) {
byte[] bytesSingleNumber = Arrays.copyOfRange(bytesSamples, time * numBytesPerSample, (time + 1) * numBytesPerSample);
int value = numBytesPerSample == 2
? (UtilClass.Byte2IntLit(bytesSingleNumber[0], bytesSingleNumber[1]))
: (UtilClass.byte2intSmpl(bytesSingleNumber[0]));
return value;
}
private static List<List<TimePitchValue>> transposedNestedList(List<List<TimePitchValue>> nestedList) {
List<List<TimePitchValue>> outNestedList = new ArrayList<>();
nestedList.forEach(pitchList -> {
pitchList.forEach(pitchValue -> {
List<TimePitchValue> listTimePitchValueWithTime = listTimePitchValueWithTime(outNestedList, pitchValue.getTime());
if (!outNestedList.contains(listTimePitchValueWithTime)) {
outNestedList.add(listTimePitchValueWithTime);
}
listTimePitchValueWithTime.add(pitchValue);
});
});
outNestedList.forEach(pitchList -> {
pitchList.sort(Comparator.comparingInt(TimePitchValue::getValue).reversed());
});
return outNestedList;
}
private static List<TimePitchValue> listTimePitchValueWithTime(List<List<TimePitchValue>> nestedList, long time) {
List<TimePitchValue> listTimePitchValueWithTime = nestedList
.stream()
.filter(innerList -> innerList.stream()
.anyMatch(timePitchValue -> timePitchValue.getTime() == time))
.findAny()
.orElseGet(ArrayList::new);
return listTimePitchValueWithTime;
}
}
I was testing:
With 5 Seconds in Generator
class and the List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue);
line in TransposerComparator
class, Commented 7 Seconds needed, Uncommented 211 Seconds needed.
With 10 Seconds in Generator
class and the List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue);
line in TransposerComparator
class, Commented 12 Seconds needed, Uncommented 574 Seconds needed.
I need to use the application at least 60 minutes.
With the purpose of reduce the needed (consumed) time, I have two ways:
- I choose for short is to optimize the methods that I am currently using.
- That should be successful but longer and is to use
GPGPU
, but I don't know where to start implementing it yet.
QUESTIONS
This Question is for the first way: What changes do you recommend in the code of the transposedNestedList
method in order to improve speed?
Is there better alternative to use this Comparison?
outNestedList.forEach(pitchList -> {
pitchList.sort(Comparator.comparingInt(TimePitchValue::getValue).reversed());
});