16

I'm still in the process of wrapping my brain around how concurrency works in Java. I understand that (if you're subscribing to the OO Java 5 concurrency model) you implement a Task or Callable with a run() or call() method (respectively), and it behooves you to parallelize as much of that implemented method as possible.

But I'm still not understanding something inherent about concurrent programming in Java:

  • How is a Task's run() method assigned the right amount of concurrent work to be performed?

As a concrete example, what if I have an I/O-bound readMobyDick() method that reads the entire contents of Herman Melville's Moby Dick into memory from a file on the local system. And let's just say I want this readMobyDick() method to be concurrent and handled by 3 threads, where:

  • Thread #1 reads the first 1/3rd of the book into memory
  • Thread #2 reads the second 1/3rd of the book into memory
  • Thread #3 reads the last 1/3rd of the book into memory

Do I need to chunk Moby Dick up into three files and pass them each to their own task, or do I I just call readMobyDick() from inside the implemented run() method and (somehow) the Executor knows how to break the work up amongst the threads.

I am a very visual learner, so any code examples of the right way to approach this are greatly appreciated! Thanks!

Gray
  • 115,027
  • 24
  • 293
  • 354
IAmYourFaja
  • 55,468
  • 181
  • 466
  • 756

4 Answers4

22

You have probably chosen by accident the absolute worst example of parallel activities!

Reading in parallel from a single mechanical disk is actually slower than reading with a single thread, because you are in fact bouncing the mechanical head to different sections of the disk as each thread gets its turn to run. This is best left as a single threaded activity.

Let's take another example, which is similar to yours but can actually offer some benefit: assume I want to search for the occurrences of a certain word in a huge list of words (this list could even have come from a disk file, but like I said, read by a single thread). Assume I can use 3 threads like in your example, each searching on 1/3rd of the huge word list and keeping a local counter of how many times the searched word appeared.

In this case you'd want to partition the list in 3 parts, pass each part to a different object whose type implements Runnable and have the search implemented in the run method.

The runtime itself has no idea how to do the partitioning or anything like that, you have to specify it yourself. There are many other partitioning strategies, each with its own strengths and weaknesses, but we can stick to the static partitioning for now.

Let's see some code:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

This should work nicely. Note that in practical cases you would build a more generic partitioning scheme. You could alternatively use an ExecutorService and implement Callable instead of Runnable if you wish to return a result.

So an alternative example using more advanced constructs:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}
Tudor
  • 61,523
  • 12
  • 102
  • 142
  • 1
    So what would be a good example of a task that would benefit from multithreading? I really don't care at all about reading files from disk - I care about seeing a living, breathing (**code**) example of how work is chunked up and fed to tasks. – IAmYourFaja May 16 '12 at 19:19
  • @herpylderp: Now it should be complete. – Tudor May 16 '12 at 19:33
  • Wow great answer @Tudor! My only followup is: when you speak of partitioning schems (you mentioned "static partitioning") are these Java constructs available in the concurrency package or are these theoretical strategies that are common to any language/environment? I ask because they seem to be at the heart of what I'm not "getting". Thanks again (and +1)! – IAmYourFaja May 16 '12 at 19:38
  • @herpylderp: Am alternative scheme is "on-demand". This is usually used when you have many more tasks than available threads and the tasks take varying amounts of time to finish. In this case it's better to use a thread pool and have the threads take tasks "on-demand" instead of assigning them manually in the beginning to ensure a load balanced work. – Tudor May 16 '12 at 19:43
  • @herpylderp: I've also added an alternative version of my example using thread pools and futures. – Tudor May 16 '12 at 19:43
  • Great - thanks @Tudo - but now I'm not "getting" how thread pools chunk up work dynamically (as opposed to this static partitioning). I suspect I'll be posting a similar (pool-related) question sometime soon... thanks again. – IAmYourFaja May 16 '12 at 19:46
  • 1
    @herpylderp: A thread pool (like the one I used above) stores a queue of tasks internally and a bunch of threads. The threads loop continuously looking if there are tasks available in the queue and if there are, they take one and execute it, before returning to take more. The user simply puts tasks in the queue using `submit` and the threads execute them in the way I explained. – Tudor May 16 '12 at 19:48
  • Ahh, so you would still need to partition the problem manually (at some point) and then `submit` them to the pool? – IAmYourFaja May 16 '12 at 19:59
  • @herpylderp: Yes, unfortunately there is no magic way to partition stuff, at least not in java. :) – Tudor May 16 '12 at 20:03
  • I am executing a test for the same use case - reading a single file from multiple threads. I found that having multiple threads improves the performance if the underlying storage is a SATA disk drive while improves the performance if its a SAS drive. Would it be because of the point-to-point technology used in SAS or is my test doing something incorrect? – Andy Dufresne Jun 24 '13 at 13:20
2

You picked a bad example, as Tudor was so kind to point out. Spinning disk hardware is subject to physical constraints of moving platters and heads, and the most efficient read implementation is to read each block in order, which reduces the need to move the head or wait for the disk to align.

That said, some operating systems don't always store things continuously on disks, and for those who remember, defragmentation could provide a disk performance boost if you OS / filesystem didn't do the job for you.

As you mentioned wanting a program that would benefit, let me suggest a simple one, matrix addition.

Assuming you made one thread per core, you can trivially divide any two matrices to be added into N (one for each thread) rows. Matrix addition (if you recall) works as such:

A + B = C

or

[ a11, a12, a13 ]   [ b11, b12, b13]  =  [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23]  =  [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ]   [ b31, b32, b33]  =  [ (a31+b31), (a32+b32), (a33+c33) ]

So to distribute this across N threads, we simply need to take the row count and modulus divide by the number of threads to get the "thread id" it will be added with.

matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6,  9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0

Now each thread "knows" which rows it should handle, and the results "per row" can be computed trivially because the results do not cross into other thread's domain of computation.

All that is needed now is a "result" data structure which tracks when the values have been computed, and when last value is set, then the computation is complete. In this "fake" example of a matrix addition result with two threads, computing the answer with two threads takes approximately half the time.

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only.  Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)

More complex problems can be solved by multithreading, and different problems are solved with different techniques. I purposefully picked one of the simplest examples.

Edwin Buck
  • 69,361
  • 7
  • 100
  • 138
2

you implement a Task or Callable with a run() or call() method (respectively), and it behooves you to parallelize as much of that implemented method as possible.

A Task represents a discrete unit of work
Loading a file into memory is a discrete unit of work and can therefore this activity can be delegated to a background thread. I.e. a background thread runs this task of loading the file.
It is a discrete unit of work since it has no other dependencies needed in order to do its job (load the file) and has discrete boundaries.
What you are asking is to further divide this into task. I.e. a thread loads 1/3 of the file while another thread the 2/3 etc.
If you were able to divide the task into further subtasks then it would not be a task in the first place by definition. So loading a file is a single task by itself.

To give you an example:
Let's say that you have a GUI and you need to present to the user data from 5 different files. To present them you need also to prepare some data structures to process the actual data.
All these are separate tasks.
E.g. the loading of files is 5 different tasks so could be done by 5 different threads.
The preparation of the data structures could be done a different thread.
The GUI runs of course in another thread.
All these can happen concurrently

Cratylus
  • 52,998
  • 69
  • 209
  • 339
-1

If you system supported high-throughput I/O , here is how you can do it:

How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

Here is the solution to read a single file with multiple threads.

Divide the file into N chunks, read each chunk in a thread, then merge them in order. Beware of lines that cross chunk boundaries. It is the basic idea as suggested by user slaks

Bench-marking below implementation of multiple-threads for a single 20 GB file:

1 Thread : 50 seconds : 400 MB/s

2 Threads: 30 seconds : 666 MB/s

4 Threads: 20 seconds : 1GB/s

8 Threads: 60 seconds : 333 MB/s

Equivalent Java7 readAllLines() : 400 seconds : 50 MB/s

Note: This may only work on systems that are designed to support high-throughput I/O , and not on usual personal computers

Here is the essential nits of the code, for complete details , follow the link

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads


    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down

}

}
Community
  • 1
  • 1
sanketshah
  • 424
  • 1
  • 5
  • 13