0

I try to read a file in chunks and to pass each chunk to a thread that will count how many times each byte in the chunk is contained. The trouble is that when I pass the whole file to only one thread I get correct result but passing it to multiple threads the result becomes very strange. Here`s my code:

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main{

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException 
    {
        // get number of threads to be run
        Scanner in = new Scanner(System.in);
        int numberOfThreads = in.nextInt();

        // read file
        File file = new File("testfile.txt");
        long fileSize = file.length();
        long chunkSize = fileSize / numberOfThreads;

        FileInputStream input = new FileInputStream(file);
        byte[] buffer = new byte[(int)chunkSize];

        ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
        Set<Future<int[]>> set = new HashSet<Future<int[]>>();

        while(input.available() > 0)
        {

            if(input.available() < chunkSize)
            {
                chunkSize = input.available();
            }

            input.read(buffer, 0, (int) chunkSize);

            Callable<int[]> callable = new FrequenciesCounter(buffer);
            Future<int[]> future = pool.submit(callable);
            set.add(future);
        }

        // let`s assume we will use extended ASCII characters only
        int alphabet = 256;

        // hold how many times each character is contained in the input file
        int[] frequencies = new int[alphabet];

        // sum the frequencies from each thread
        for(Future<int[]> future: set)
        {
            for(int i = 0; i < alphabet; i++)
            {
                frequencies[i] += future.get()[i];
            }
        }

        input.close();

        for(int i = 0; i< frequencies.length; i++)
        {
            if(frequencies[i] > 0) System.out.println((char)i + "  " + frequencies[i]);
        }
    }

}

//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable<int[]>
{
    private int[] frequencies = new int[256];
    private byte[] input;

    public FrequenciesCounter(byte[] buffer)
    {
        input = buffer;
    }

    public int[] call()
    {


        for(int i = 0; i < input.length; i++)
        {
            frequencies[(int)input[i]]++;
        }

        return frequencies;
    }
}

My testfile.txt is aaaaaaaaaaaaaabbbbcccccc. With 1 thread the output is:

a  14
b  4
c  6`

With 2 threads the output is:

a  4
b  8
c  12

With 3 threads the output is:

b  6
c  18

And so other strange results that I cannot figure out. Could anybody help?

barni
  • 49
  • 1
  • 2
  • 7
  • Your code doesn't compile. So it can't run. – JB Nizet Jun 24 '17 at 16:45
  • 1
    Also, never use available(), and never assume that a call to read() will read the exact number of bytes you asked to read, i.e. don't ignore the value returned by read(). The Java IO tutorial shows how to correctly read bytes. – JB Nizet Jun 24 '17 at 16:49
  • I'm sorry, I had forgotten some test stuff, now it should be fine. Thank you, I will look there – barni Jun 24 '17 at 16:55
  • Aside: Because of `(char)i`, the only one of the over 220 "[extended ASCII](https://en.wikipedia.org/wiki/Extended_ASCII)" encodings your program handles is just [ISO 8859-1](http://ftp.unicode.org/Public/MAPPINGS/ISO8859/8859-1.TXT). So, you might as well say so because that's what the person using the program must supply. – Tom Blodget Jun 24 '17 at 19:27

2 Answers2

4

Every thread is using the same buffer, and one thread will be overwriting the buffer as another thread is trying to process it.

You need to make sure every thread has its own buffer that nobody else can modify.

Joe C
  • 15,324
  • 8
  • 38
  • 50
1

Create byte[] array for every thread.

 public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
            // get number of threads to be run
            Scanner in = new Scanner(System.in);
            int numberOfThreads = in.nextInt();

            // read file
            File file = new File("testfile.txt");
            long fileSize = file.length();
            long chunkSize = fileSize / numberOfThreads;

            FileInputStream input = new FileInputStream(file);

            ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
            Set<Future<int[]>> set = new HashSet<Future<int[]>>();

            while (input.available() > 0) {
               //create buffer for every thread.
                byte[] buffer = new byte[(int) chunkSize];

                if (input.available() < chunkSize) {
                    chunkSize = input.available();
                }

                input.read(buffer, 0, (int) chunkSize);

                Callable<int[]> callable = new FrequenciesCounter(buffer);
                Future<int[]> future = pool.submit(callable);
                set.add(future);
            }

            // let`s assume we will use extended ASCII characters only
            int alphabet = 256;

            // hold how many times each character is contained in the input file
            int[] frequencies = new int[alphabet];

            // sum the frequencies from each thread
            for (Future<int[]> future : set) {
                for (int i = 0; i < alphabet; i++) {
                    frequencies[i] += future.get()[i];
                }
            }

            input.close();

            for (int i = 0; i < frequencies.length; i++) {
                if (frequencies[i] > 0)
                    System.out.println((char) i + "  " + frequencies[i]);
            }
        }

    }
gati sahu
  • 2,576
  • 2
  • 10
  • 16
  • And one further question.. do you have any idea why the more the threads I pass, the poorer the performance becomes? – barni Jun 24 '17 at 18:14
  • @bami Managing more threads is more work for the pool and the operating system and more processor [context switches](https://en.wikipedia.org/wiki/Context_switch). The rule of thumb is to have the [same number of threads as processor cores](https://stackoverflow.com/a/5499261/2226988) but you are right to test it out. – Tom Blodget Jun 24 '17 at 19:37
  • @bami You might also try putting the file reading into each thread and matching the buffer size to the filesystem's allocation size or even the HDD controller's block read size. The file may not be laid out sequentially on the disk so by not serializing the read, you might gain something by letting the I/O systems pick the order of the reads. (Changes the separation of concerns, though, unless you use a [MappedByteBuffer](https://docs.oracle.com/javase/8/docs/api/java/nio/MappedByteBuffer.html).) – Tom Blodget Jun 24 '17 at 19:43