2

I'm having multiple threads running in my threadPool Each thread reads a huge file and returns the data from this file in a List.

Code looks like :

class Writer{


   ArrayList finalListWhereDataWillBeWritten = new Array<Integer>()
   for(query q : allQueries){ //all the read queries to read file

      threadPool.submit(new GetDataFromFile(fileName,filePath));        

   }//all the read queries have been submitted.

}

Now I know that following section of code will occur some where in my code but I don't know where to place it. Because if I place it just after submit() in for loop it'll not add it because each file is very huge and may not have completed its processing.

synchronized(finalListWhereDataWillBeWritten){

  //process the data obtained from single file and add it to target list 
      finalListWhereDataWillBeWritten.addAll(dataFromSingleThread); 

}

So can anyone please tell me that where do I place this chunk of code and what other things I need to make sure of so that Critical Section Problem donot occur.

class GetDataFromFile implements Runnable<List<Integer>>{

   private String fileName;
   private String filePath;

   public List<Integer> run(){
       //code for streaming the file fileName 
       return dataObtainedFromThisFile;
   }

}

And do i need to use wait() / notifyAll() methods in my code given that I'm only reading data from files parallely in threads and placing them in a shared List

veer
  • 23
  • 1
  • 5

2 Answers2

3

Instead of reinventing the wheel you should simply implement Callable<List<Integer>> and submit it to the JDK's standard Executor Service. Then, as the futures complete, you collect the results into the list.

final ExecutorService threadPool = 
    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
final List<Future<List<Integer>>> futures = new ArrayList<>();
for(query q : allQueries) {
  futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath)));
}
for (Future<List<Integer>> f : futures) {
    finalListWhereDataWillBeWritten.addAll(f.get());
}

And this is all assuming you are below Java 8. With Java 8 you would of course use a parallel stream:

final List<Integer> finalListWhereDataWillBeWritten =
  allQueries.parallelStream()
            .flatMap(q -> getDataFromFile(q.fileName, q.filePath))
            .collect(toList());
Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
0

UPDATE Please consider the answer provided by Marko which is far better

If you want to ensure that your threads all complete before you work on your list, do the following:

import java.util.List;
import java.util.Vector;

public class ThreadWork {

  public static void main(String[] args) {

    int count = 5;
    Thread[] threads = new ListThread[count];
    List<String> masterList = new Vector<String>();

    for(int index = 0; index < count; index++) {
      threads[index] = new ListThread(masterList, "Thread " + (index + 1));
      threads[index].start();
    }
    while(isOperationRunning(threads)) {
      // do nothing
    }

    System.out.println("Done!! Print Your List ...");

    for(String item : masterList){
      System.out.println("[" + item + "]");
    }
  }

  private static boolean isOperationRunning(Thread[] threads) {
    boolean running = false;

    for(Thread thread : threads) {
      if(thread.isAlive()) {
        running = true;
        break;
      }
    }
    return running;
  }
}

class ListThread extends Thread {
  private static String items[] = { "A", "B", "C", "D"};
  private List<String> list;
  private String name;

  public ListThread(List<String> masterList, String threadName) {
    list = masterList;
    name = threadName;
  }

  public void run() {
    for(int i = 0; i < items.length;++i) {
      randomWait();
      String data = "Thread [" + name + "][" + items[i] + "]";
      System.out.println( data );
      list.add( data );
    }
  }

  private void randomWait() {
    try {
      Thread.currentThread();
      Thread.sleep((long)(3000 * Math.random()));
    }
    catch (InterruptedException x) {}
  }
}
Constantin
  • 1,506
  • 10
  • 16
  • Well your code is fine and will do what I want.....but don't you think that polling it continuously will increase the time of execution of this program? – veer Jul 12 '15 at 13:07
  • The point of threads is to run processes concurrently. If I have ten files to process, I can process ten threads at once instead of one at a time. Sometimes, however, you need to act upon the results of the Thread work only AFTER ALL the threads are complete. Again, if this was not what you want, let me know and I will delete this answer – Constantin Jul 12 '15 at 13:15
  • Why you are asking to delete again and again...may be your answer can help someone in future. – veer Jul 12 '15 at 13:17
  • My intent here is to help you – Constantin Jul 12 '15 at 13:20
  • You may also consider using a synchronized list (Thread-safe master list) or a Vector to add the data to the master list within each individual Thread itself ... Just pass a reference of the master list to each Thread... See http://stackoverflow.com/questions/2444005/how-do-i-make-my-arraylist-thread-safe-another-approach-to-problem-in-java – Constantin Jul 12 '15 at 13:29
  • agreed...but how if I pass my list to the Runnable Class how can I make sure that the data I modify here in Runnable Class will be reflected there in Main Thread Class – veer Jul 12 '15 at 13:37
  • while searching for a solution I found this : http://stackoverflow.com/questions/17827022/what-is-countdown-latch-in-java-multithreading This is another very useful approach – veer Jul 12 '15 at 13:38
  • any suggestions on passing the reference of my list so that i can add data from there ? Actually I'm a newbie in Java – veer Jul 12 '15 at 13:46
  • I modified the code in answer to use a referenced Vector so no need to issue an addAll as it is done immediately within each Thread. Note however that your results will no longer be sorted, ofcourse – Constantin Jul 12 '15 at 13:58
  • fine..i think you should also lock....masterList as multiple threads are writing into this. – veer Jul 12 '15 at 14:15
  • I do like the example you noted, however in a previous question. Many ways to skin a cat ... – Constantin Jul 12 '15 at 14:24
  • Polling the threads via `isAlive()` will raise the CPU load and *will* increase the execution time unless you have more CPU cores than threads. There is no reason for such a polling loop. Just call `join()` once on each thread, the order doesn’t matter, and after that, you know that all threads are done. – Holger Jul 16 '15 at 15:51
  • Thank you for the clarification. I hope op considers accepting Marko's answer instead ... – Constantin Jul 16 '15 at 17:57