0

I have an application in Java 8 Collecting Data of multiple Threads using BlockingQueue. I need to perform comparison of samples.

But my application is very large, I implemented a mock application (Github) in Java 8.

I'm generating a chunk of bytes (really is random order).

enter image description here

The bytes are stored into ChunkDTO class.

I implemented a capturer the ChunkDTO in a List, code in Capturer class.

enter image description here

Each ChunkDTO of List is translated into a List of Samples (TimePitchValue exactly) returning a nested List (or List of List of TimePitchValue).

enter image description here

Later the nested List is transposed in order to performs comparisons between TimePitchValue with the same time value.

enter image description here

Due to enormous volume of TimePitchValue instances it's consumes huge time in my application.

Here some code (The complete functional Code is in Github) because is still large for this site).

public class Generator {

  final static Logger LOGGER = Logger.getLogger("SampleComparator");

  public static void main(String[] args) {
    long previous = System.nanoTime();
    final int minBufferSize = 2048;
    int sampleRate = 8192;

    int numChannels = 1;
    int numBytesPerSample = 1;
    int samplesChunkPerSecond = sampleRate / minBufferSize;
    int minutes = 0;
    int seconds = 10;
    int time = 60 * minutes + seconds;
    int chunksBySecond = samplesChunkPerSecond * numBytesPerSample * numChannels;
    int pitchs = 32;

    boolean signed = false;
    boolean endianness = false;
    AudioFormat audioformat = new AudioFormat(sampleRate, 8 * numBytesPerSample, numChannels, signed, endianness);
    ControlDSP controlDSP = new ControlDSP(audioformat);
    BlockingQueue<ChunkDTO> generatorBlockingQueue = new LinkedBlockingQueue<>();

    Capturer capturer = new Capturer(controlDSP, pitchs, pitchs * time * chunksBySecond, generatorBlockingQueue);
    controlDSP.getListFuture().add(controlDSP.getExecutorService().submit(capturer));

    for (int i = 0; i < time * chunksBySecond; i++) {
      for (int p = 0; p < pitchs; p++) {
        ChunkDTO chunkDTO = new ChunkDTO(UtilClass.getArrayByte(minBufferSize), i, p);
        LOGGER.info(String.format("chunkDTO: %s", chunkDTO));
        try {
          generatorBlockingQueue.put(chunkDTO);
        } catch (InterruptedException ex) {
          LOGGER.info(ex.getMessage());
        }
      }
      try {
        Thread.sleep(1000 / chunksBySecond);
      } catch (Exception ex) {
      }
    }
    controlDSP.tryFinishThreads(Thread.currentThread());
    long current = System.nanoTime();
    long interval = TimeUnit.NANOSECONDS.toSeconds(current - previous);
    System.out.println("Seconds Interval: " + interval);
  }
}

Capturer Class

public class Capturer implements Callable<Void> {

  private final ControlDSP controlDSP;
  private final int pitchs;
  private final int totalChunks;
  private final BlockingQueue<ChunkDTO> capturerBlockingQueue;
  private final Counter intCounter;
  private final Map<Long, List<ChunkDTO>> mapIndexListChunkDTO = Collections.synchronizedMap(new HashMap<>());
  private volatile boolean isRunning = false;
  private final String threadName;
  private static final Logger LOGGER = Logger.getLogger("SampleComparator");

  public Capturer(ControlDSP controlDSP, int pitchs, int totalChunks, BlockingQueue<ChunkDTO> capturerBlockingQueue) {
    this.controlDSP = controlDSP;
    this.pitchs = pitchs;
    this.totalChunks = totalChunks;
    this.capturerBlockingQueue = capturerBlockingQueue;
    this.intCounter = new Counter();

    this.controlDSP.getListFuture().add(this.controlDSP.getExecutorService().submit(() -> {
      while (intCounter.getValue() < totalChunks) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException ex) {
          LOGGER.log(Level.SEVERE, null, ex);
        }
      }
      capturerBlockingQueue.add(new ChunkDTOStopper());
    }));
    this.threadName = this.getClass().getSimpleName();
  }


  @Override
  public Void call() throws Exception {
    long quantity = 0;
    isRunning = true;
    while (isRunning) {
      try {
        ChunkDTO chunkDTO = capturerBlockingQueue.take();
        if (chunkDTO instanceof ChunkDTOStopper) {
          break;
        }
        //Find or Create List (according to Index) to add the incoming Chunk
        long index = chunkDTO.getIndex();
        int sizeChunk = chunkDTO.getChunk().length;
        List<ChunkDTO> listChunkDTOWithIndex = getListChunkDTOByIndex(chunkDTO);

        //When the List (according to Index) is completed and processed
        if (listChunkDTOWithIndex.size() == pitchs) {
          mapIndexListChunkDTO.remove(index);
          TransposerComparator transposerComparator = new TransposerComparator(controlDSP, controlDSP.getAudioformat(), index, sizeChunk, listChunkDTOWithIndex);
          controlDSP.getListFuture().add(controlDSP.getExecutorService().submit(transposerComparator));
        }
        quantity++;
        intCounter.setValue(quantity);
        LOGGER.info(String.format("%s\tConsumes:%s\ttotal:%05d", threadName, chunkDTO, quantity));
      } catch (Exception ex) {
        LOGGER.log(Level.SEVERE, null, ex);
      }
    }
    LOGGER.info(String.format("%s\tReceived:%05d\tQty:%s\tPitchs:%s\tEND\n", threadName, quantity, quantity / pitchs, pitchs));

    return null;
  }

  private List<ChunkDTO> getListChunkDTOByIndex(ChunkDTO chunkDTO) {
    List<ChunkDTO> listChunkDTOWithIndex = mapIndexListChunkDTO.get(chunkDTO.getIndex());
    if (listChunkDTOWithIndex == null) {
      listChunkDTOWithIndex = new ArrayList<>();
      mapIndexListChunkDTO.put(chunkDTO.getIndex(), listChunkDTOWithIndex);
      listChunkDTOWithIndex = mapIndexListChunkDTO.get(chunkDTO.getIndex());
    }
    listChunkDTOWithIndex.add(chunkDTO);
    return listChunkDTOWithIndex;
  }


}

TransposerComparator class.

The optimization required is in this code, specifically on transposedNestedList method.

public class TransposerComparator implements Callable<Void> {

  private final ControlDSP controlDSP;
  private final AudioFormat audioformat;
  private final long index;
  private final int sizeChunk;
  private final List<ChunkDTO> listChunkDTOWithIndex;
  private final String threadName;
  private static final Logger LOGGER = Logger.getLogger("SampleComparator");

  public TransposerComparator(ControlDSP controlDSP, AudioFormat audioformat, long index, int sizeChunk, List<ChunkDTO> listChunkDTOWithIndex) {
    this.controlDSP = controlDSP;
    this.audioformat = audioformat;
    this.index = index;
    this.sizeChunk = sizeChunk;
    this.listChunkDTOWithIndex = listChunkDTOWithIndex;
    this.threadName = this.getClass().getSimpleName() + "_" + String.format("%05d", index);
  }

  @Override
  public Void call() throws Exception {
    Thread.currentThread().setName(threadName);
    LOGGER.info(String.format("%s\tINI", threadName));
    try {

      int numBytesPerSample = audioformat.getSampleSizeInBits() / 8;
      int quantitySamples = sizeChunk / numBytesPerSample;
      long baseTime = quantitySamples * index;

      // Convert the List of Chunk Bytes to Nested List of TimePitchValue
      List<List<TimePitchValue>> nestedListTimePitchValue = listChunkDTOWithIndex
          .stream()
          .map(chunkDTO -> {
            return IntStream
                .range(0, quantitySamples)
                .mapToObj(time -> {
                  int value = extractValue(chunkDTO.getChunk(), numBytesPerSample, time);
                  return new TimePitchValue(chunkDTO.getPitch(), baseTime + time, value);
                }).collect(Collectors.toList());
          }).collect(Collectors.toList());

      List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue);

    } catch (Exception ex) {
      ex.printStackTrace();
      LOGGER.log(Level.SEVERE, null, ex);
      throw ex;
    }
    return null;
  }

  private static int extractValue(byte[] bytesSamples, int numBytesPerSample, int time) {
    byte[] bytesSingleNumber = Arrays.copyOfRange(bytesSamples, time * numBytesPerSample, (time + 1) * numBytesPerSample);
    int value = numBytesPerSample == 2
        ? (UtilClass.Byte2IntLit(bytesSingleNumber[0], bytesSingleNumber[1]))
        : (UtilClass.byte2intSmpl(bytesSingleNumber[0]));
    return value;
  }

  private static List<List<TimePitchValue>> transposedNestedList(List<List<TimePitchValue>> nestedList) {
    List<List<TimePitchValue>> outNestedList = new ArrayList<>();
    nestedList.forEach(pitchList -> {
      pitchList.forEach(pitchValue -> {
        List<TimePitchValue> listTimePitchValueWithTime = listTimePitchValueWithTime(outNestedList, pitchValue.getTime());
        if (!outNestedList.contains(listTimePitchValueWithTime)) {
          outNestedList.add(listTimePitchValueWithTime);
        }
        listTimePitchValueWithTime.add(pitchValue);
      });
    });
    outNestedList.forEach(pitchList -> {
      pitchList.sort(Comparator.comparingInt(TimePitchValue::getValue).reversed());
    });
    return outNestedList;
  }

  private static List<TimePitchValue> listTimePitchValueWithTime(List<List<TimePitchValue>> nestedList, long time) {
    List<TimePitchValue> listTimePitchValueWithTime = nestedList
        .stream()
        .filter(innerList -> innerList.stream()
        .anyMatch(timePitchValue -> timePitchValue.getTime() == time))
        .findAny()
        .orElseGet(ArrayList::new);
    return listTimePitchValueWithTime;
  }

}

I was testing:

With 5 Seconds in Generator class and the List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue); line in TransposerComparator class, Commented 7 Seconds needed, Uncommented 211 Seconds needed.

With 10 Seconds in Generator class and the List<List<TimePitchValue>> timeNestedListTimePitchValue = transposedNestedList(nestedListTimePitchValue); line in TransposerComparator class, Commented 12 Seconds needed, Uncommented 574 Seconds needed.

I need to use the application at least 60 minutes.

With the purpose of reduce the needed (consumed) time, I have two ways:

  1. I choose for short is to optimize the methods that I am currently using.
  2. That should be successful but longer and is to use GPGPU, but I don't know where to start implementing it yet.

QUESTIONS

This Question is for the first way: What changes do you recommend in the code of the transposedNestedList method in order to improve speed?

Is there better alternative to use this Comparison?

outNestedList.forEach(pitchList -> {
          pitchList.sort(Comparator.comparingInt(TimePitchValue::getValue).reversed());
        });
joseluisbz
  • 1,491
  • 1
  • 36
  • 58
  • 1
    The line `outNestedList.contains` can probably be improved: it does a linear search of an array. Thus its complexity is `O(n^2)`. You could use a hash-set to significantly improve the complexity, and so probably the performance. Moreover, I think there is a *lot* to do before trying to use the GPU: working on better data structures, vectorization, parallelization, caching, reducing copies and allocations! Actually, I do not think porting this code to GPUs will result in faster code than just a good CPU implementation because the computational intensity it quite low (ie. ops/byte). – Jérôme Richard May 12 '20 at 10:41
  • https://stackoverflow.com/q/12191325/811293. https://stackoverflow.com/q/558978/811293 – joseluisbz May 18 '20 at 00:36

0 Answers0