0

I'm new to Java, and I need some help working on this program. This is a small part of a large class project, and I must use multithreading.

Here's what I want to do algorithmically:

while (there is still input left, store chunk of input in <chunk>)
{
    if there is not a free thread in my array then
      wait until a thread finishes

    else there is a free thread then
       apply the free thread to <chunk> (which will do something to chunk and output it).  
              Note: The ordering of the chunks being output must be the same as input
}

So, the main things I don't know how to do:

  • How can I check whether or not there's a free thread in the array? I know that there is a function ThreadAlive, but it seems super inefficient to poll every single thread every time in my loop.
  • If there is no free thread, how can I wait until one has finished?
  • The ordering is important. How can I preserve the ordering in which the threads output? As in, the order of the output needs to match the order of the input. How can I guarantee this synchronization?
  • How do I even pass the chunk to my thread? Can I just use the Runnable interface to do this?

Any help with these four bullets is greatly appreciated. Since I'm a super noob, code samples would help significantly.

(side-note: Making an array of threads was just an idea of mine to handle the user defined number of threads. If you have a better way to handle this you're welcome to suggest it!)

Casey Patton
  • 4,021
  • 9
  • 41
  • 54
  • if you require the same order in your input as output, multithreading might not be the way to go since you could never guarantee the threads will be finished in the order they were created. Like your first thread could finish 5th – Ben Oct 26 '11 at 07:34
  • 1
    I would not recommend using Threads for a sequential program as you suggest in bullet three 'The ordering is important'. Creating a list of Threads sounds very strange, Thread management is typically something you let the Java Virtual machine handle. My guess is that you have a Cobol-like background and would there recommend to start by reading a (Java) Object Oriented starters guide. – Dave Oct 26 '11 at 07:45
  • In addition to what @Dave said, if you are really new to Java then using a multithread solution might turn out with lots of problems. You might use it for experimenting but not for production. – mohdajami Oct 26 '11 at 07:55

5 Answers5

5

Sounds like you basically have a producer/consumer model and can be solved with an ExecutorService and BlockingQueue. Here is a similar question with a similar answer:

producer/consumer work queues

Community
  • 1
  • 1
ryanbrainard
  • 5,918
  • 35
  • 41
  • I feel like my case is sort of different. Rather, I'm never going to have to wait for the input. My input is coming from a file and will not run out. What I'm waiting for is for my threads to finish. I'm having trouble conforming my problem to this model. – Casey Patton Oct 26 '11 at 19:48
  • @Casey Patton, what your problem describes is the basic implementation of an `ExecutorService`. No need to invent your own, just use what exists. I've tried to create an outline for you to follow in a separate answer. Also, it should be noted that a `Thread` can not be started twice. You can't just reuse a Thread instance. – Tim Bender Oct 26 '11 at 21:18
1

As @altaiojok mentioned, you want to use an ExecutorService and BlockingQueue. The basic algorithm works like this:

ExecutorService executor = Executors.newFixedThreadPool(...); //or newCachedThreadPool, etc...
BlockingQueue<Future<?>> outputQueue = new LinkedBlockingQueue<Future<?>>();


//To be run by an input processing thread
void submitTasks() {
    BufferedReader input = ... //going to assume you have a file that you want to read, this could be any method of input (file, keyboard, network, etc...)
    String line = input.readLine();
    while(line != null) {
        outputQueue.add(executor.submit(new YourCallableImplementation(line)));
        line = input.readLine();
    }
}

//To be run by a different output processing thread
void processTaskOutput() {
    try {
        while(true) {
            Future<?> resultFuture = outputQueue.take();
            ? result = resultFuture.get();
            //process the output (write to file, send to network, print to screen, etc...
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

I'll leave it to you to figure out how to implement Runnable to make the input and output thread as well as how to implement Callable for the tasks you need to process.

Tim Bender
  • 20,112
  • 2
  • 49
  • 58
  • Thus is pretty much what I mentioned except for the `ExecutorService` ,posted 10 hours after . and without touching upon important points like the difference between Order of processing and order of output. – Bhaskar Oct 26 '11 at 22:32
  • Could you explain what my BlockingQueue would be doing in my specific example? This is the part I'm struggling to understand. – Casey Patton Oct 27 '11 at 00:06
  • @Casey Patton, the blocking queue holds the results of the tasks you submit. The pattern being used looks kind of like this: --single thread input read--<==multithreaded processing==>--single thread output write. The "multithreaded processing" part should just compute results, not output them. – Tim Bender Oct 27 '11 at 00:18
  • Awesome, thanks a lot for the help. Gave you a +1. I'm thinking I should probably use "put" instead of "add" on my queue though in the processInput, since I want to wait for threads to finish before adding more onto it. Does this seem right or is there something I'm not thinking about? – Casey Patton Oct 27 '11 at 00:51
  • You're right, put would be better in the case that the BlockingQueue is also bounded. – Tim Bender Oct 27 '11 at 04:35
0

Streams might come handy:

List<Chunk> chunks = new ArrayList<>();
//....
Function<Chunk, String> toWeightInfo = (chunk) -> "weight = "+(chunk.size()*chunk.prio());

List<String> results = chunks.parallelStream()
  .map(toWeightInfo)
  .collect(Collectors.toList());

System.out.println(results);

The parallel stream uses the System's default "fork/join" thread pool, which should be the size of available logical CPUs and processes your stuff in parallel. It also guarantees the same order of results. The parallel streams API hides all the complexity of assigning free threads to jobs and optimizations like work-stealing away from you. Just give it something to chew on and it will work its magic.

If you need to use a thread pool of a custom size, please refer to the Custom thread pool in Java 8 parallel stream question. You might also have a look at this good Java 8 Stream Tutorial.

If your case is rather complex and you're streaming chunks into your program, and you've got multiple stages of work, where some must be serial and some can be parallel and some depend on each other, you might have a look at the Disruptor framework from LMAX.

Kind regards

Community
  • 1
  • 1
Brixomatic
  • 381
  • 4
  • 16
0

I would suggest using commons-pool which offers pooling of threads so you can easily limit the number of used threads and it also offers some other helper methods.

Concerning the ordering: have a look at the synchronize keyword.

And I would suggest to have a look at the java tutorial (the part about concurrency): http://download.oracle.com/javase/tutorial/essential/concurrency/index.html

HefferWolf
  • 3,894
  • 1
  • 23
  • 29
-1

Use ExecutorCompletionService and Future<T>. Together they provide a threadpool based task framework that takes care of all your concerns.

How can I check whether or not there's a free thread in the array? I know that there is a function ThreadAlive, but it seems super inefficient to poll every single thread every time in my loop.

You dont have to. The executor will do this for you in an (super)efficient manner.You just have to submit tasks to it and sit back.

If there is no free thread, how can I wait until one has finished?

Again , you really dont have to. This is taken care of by executor.

The ordering is important. How can I preserve the ordering in which the threads output? As in, the order of the output needs to match the order of the input. How can I guarantee this synchronization?

This is a concern. If you want the processed output ( of chunks, in your words ) to arrive in the same order as these chunks are present in the initial array, you have to address a few points :

Is it just the order of arrival of the results that matter , or is it that the tasks processing themselves have dependencies on the order ? If it is the former , it is much easily done, but if its the later , then you have problems. ( which I think are very hard things to start with considering your admission of being new to Java, so I would just recommend more learning on your part before attempting this. )

Assuming it is the former case , what you can do is this : Submit the chunks to the executor in some order , and each submission will give you a handle ( called a Future<Result> ) to the task processed output. Store these handles in a ordered queue, and when you want the results , call the get() on these Future(s). Note that if some task in the middle of the order takes long time to complete , then the results of the following tasks will also be delayed.

How do I even pass the chunk to my thread? Can I just use the Runnable interface to do this?

Create a Callable instance wrapping one chunk each into the instance. This represents your task that you will submit() to the ExecutorService.

Bhaskar
  • 7,443
  • 5
  • 39
  • 51
  • `ExecutorCompletionService` is not suitable for tasks in which the order of processing results matters. More specifically, that interface was created explicitly for the case when order of output does not matter. An `ExecutorService` is the appropriate interface to use here. – Tim Bender Oct 26 '11 at 20:53
  • @TimBender, First , can you tell me how you can achieve the ordered processing using `ExecutorService` that you cannot achieve using `ExecutorCompletionService` ? Second , the OP does not mention that *order of processing* is to be maintained , all that is said is that *order of output* is important - there is big difference between the two, and that is exactly what I had noted in my third point. – Bhaskar Oct 26 '11 at 22:18
  • First, I said order of processing output, not order of processing, read again, there is a difference. Second, if used, your third point would create a memory leak since the `BlockingQueue` internal to the `ExecutorCompletionService` implementation would not be drained. Most importantly though is the clarity of the intent of the interfaces. `ExecutorCompletionService` is explicitly intended for use cases where the order of retrieving results from submitted tasks does not matter. That doesn't fit this use case. – Tim Bender Oct 26 '11 at 23:58
  • From the javadoc for `CompletionService`: "A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. Producers submit tasks for execution. Consumers take completed tasks and process their results in the **order they complete**. A CompletionService can for example be used to manage asynchronous IO, in which tasks that perform reads are submitted in one part of a program or system, and then acted upon in a different part of the program when the reads complete, **possibly in a different order than they were requested**." – Tim Bender Oct 27 '11 at 00:06
  • lol, it is only a silly quarrel when you downvote answers out of spite. – Tim Bender Oct 27 '11 at 00:07