0

I´m having problem to copy some emails to other folder using threads, my problem is, the code don´t wait to finish the job.

I want to move the messages by threads to accelaret the job, but I need to wait to move all messages, so how can I do this?

private static void moveMessagesToFolders(List<Message> listMessages, Store store, Set<String> setSender) throws MessagingException {

    HashMap<String, List<Message>> mapMessages = separeteMessagesBySender(listMessages, setSender);

    for (Entry<String, List<Message>> mapMessage : mapMessages.entrySet()) {
        Message[] messageArray = mapMessage.getValue().toArray(new Message[mapMessage.getValue().size()]);
        moveMessagesThread(messageArray, mapMessage, store);
    }
}

private static void moveMessagesThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {
    Set<Thread> setThread = createMovimentSetThread(messageArray, mapMessage, store);

    for (Thread thread : setThread) {
        thread.start();
    }
}

private static Set<Thread> createMovimentSetThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {

    int [] threadIndexs = MathUtil.generateIndex(messageArray);
    Set<Thread> setThread = new HashSet<>(threadIndexs.length);

    for (int i = 0; i < threadIndexs.length; i++) {
        setThread.add(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
    }

    return setThread;
}

After i change the method to this implementing Executor.

private static void moveMessagesThread(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store) {

        int [] threadIndexs = MathUtil.generateIndex(messageArray);
        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 4; i++) {
            executor.execute(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
        }

        executor.shutdown();
    }

Implementing the class Thread

public class ThreadMoveMessages implements Callable<Boolean> {

    private Entry<String, List<Message>> mapMessage;
    private Store store;
    private Message[] messageArray;
    private static int indexControler;
    private static int indexLimit;

    public ThreadMoveMessages(Message[] messageArray, Entry<String, List<Message>> mapMessage, Store store, int indexEnd) {
        this.messageArray = Arrays.copyOf(messageArray, indexEnd);
        this.indexControler += indexEnd;
        this.indexLimit = indexControler;
        this.mapMessage = mapMessage;
    }

    @Override
    public Boolean call() throws Exception {
        Folder folder = null;
        try {
            folder = this.store.getDefaultFolder().getFolder(this.mapMessage.getKey());
            folder.open(Folder.READ_WRITE);
            folder.appendMessages(this.messageArray);
            EmailUtil.deleteListMessage(this.mapMessage.getValue());
        } catch (MessagingException e) {
            e.printStackTrace();
        }                       
        return true;
    }
}
Diego Macario
  • 1,240
  • 2
  • 21
  • 33
  • Consider using an [Executor](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html) instead of managing the thread pool directly. – Szymon Jednac Dec 08 '13 at 13:20
  • @SzymonBiliński I´m reading about `Executor`, i felt confused because what I need to do? I found people implementing new class or using a single method – Diego Macario Dec 08 '13 at 13:29
  • You should probably start with the [basics](http://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html), than read [this](http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish) (waiting for all tasks to finish). – Szymon Jednac Dec 08 '13 at 13:47
  • @SzymonBiliński I found one article and put changed the method, I´ve updated the question, but still have problens, I seens to don´t call the run method in thread class – Diego Macario Dec 08 '13 at 13:54

1 Answers1

2

You should use Futures and Callables if you want to wait compute something in asynchronous and wait for the results.

Implement Callable interface:

class MoveMessages implements Callable<Boolean> {

@Override
public Boolean call() throws Exception {
    boolean success = true;

    // Your implementation here

    return success;
}

}

And next submit it to Executor and retrieve Future, invoking get() on Future you will wait until computation of Callable is done.

ExecutorService executor = Executors.newFixedThreadPool(5);

MoveMessages moveMessages = new MoveMessages();
Future<Boolean> submit = executor.submit(moveMessages);

Boolean integer = submit.get(); // Will wait until task is finished

executor.shutdown();

Of course you can submit more task gets all to list and wait until all will finish.

Edit:

OK, first you say that you need to wait until all messages are moved, so one way for that case is to use Future and callable with ExecutorService. Using ExecutorService you do not need to create and start lots of new threads. Remember creating new thread generate costs. In your code you create 4 new threads for each sender, using ExecutorService you create only fixed number of threads and reuse them for each Sender. Here is your example using Executors and Futures, note that ExecutorService is created once for invoking moveMessagesToFolders:

private static ExecutorService executor 


private static void moveMessagesToFolders(List<Message> listMessages, Store store, Set<String> setSender) throws MessagingException {
    executor = Executors.newFixedThreadPool(4);

    HashMap<String, List<Message>> mapMessages = separeteMessagesBySender(listMessages, setSender);

    for (Map.Entry<String, List<Message>> mapMessage : mapMessages.entrySet()) {
        Message[] messageArray = mapMessage.getValue().toArray(new Message[mapMessage.getValue().size()]);
        moveMessagesThread(messageArray, mapMessage, store);
    }

    executor.shutdown();
}

private static void moveMessagesThread(Message[] messageArray, Map.Entry<String, List<Message>> mapMessage, Store store) {
    List<Future<Boolean>> futures = createMovimentSetThread(messageArray, mapMessage, store);

    for (Future<Boolean> future : futures) {
        try {
            Boolean success = future.get(); // Will wait to accomplished all submited Callables
            if(!success) { // Check if all submited callables end succesulfy 
                throw new RuntimeException("Something goes wrong while moving messages");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

private static List<Future<Boolean>> createMovimentSetThread(Message[] messageArray, Map.Entry<String, List<Message>> mapMessage, Store store) {
    int [] threadIndexs = MathUtil.generateIndex(messageArray);
    List<Future<Boolean>> futures = new ArrayList<>();

    for (int i = 0; i < threadIndexs.length; i++) {
        Future<Boolean> submit = executor.submit(new ThreadMoveMessages(messageArray, mapMessage, store, threadIndexs[i]));
        futures.add(submit);
    }

    return futures;
}

According to your comments, that you split array to smaller pieces using Fork/Join framework may be a better solution. Look in google for some more information. Some links:

Community
  • 1
  • 1
Marcin Wagner
  • 311
  • 2
  • 6
  • Ok, but I don´t need to create 5 `instances` of the `moveMessages` class? I put the code in the question Because i share one array in other arrays smallers to make easier the job – Diego Macario Dec 08 '13 at 14:13
  • You can use only one thread or more to accomplished all tasks. 5 it was only for example. Read about `Executors` you will find that you can maintain only one `ExecutorSerive` for entire application so you will only submit callables to it for different invocation of `moveMessagesToFolders`. Look on this site [link](http://www.vogella.com/articles/JavaConcurrency/article.html#threadpools) – Marcin Wagner Dec 08 '13 at 14:26
  • for you understand, I have a hahsmap with the email senders and a list of messages, i get this list and move the messages to the folders, but I want to increase the process of move message because it´s slow – Diego Macario Dec 08 '13 at 16:32
  • I update my answer, please check if it resolving your problem. I also post 2 links to Fork/Join Framework maybe it will be a better solution for you (Dividing array to smaller pieces). – Marcin Wagner Dec 08 '13 at 18:16
  • I´ve got doing it by using thread, but the time is higher than without thread – Diego Macario Dec 08 '13 at 18:32
  • So the biggest problem in application are I/O operations. You have asked for solution to wait until all threads are finished, and you post code sample for that problem. Reading `ThreadMoveMessages` I can only suggest to move messages instead of coping it, it will be much faster. I think you should create new question about that problem. – Marcin Wagner Dec 08 '13 at 18:49
  • Yes I thought i/o woudn´t have problem, but have, but your code help me in other ways, because I need to make other things with threads in my code – Diego Macario Dec 08 '13 at 18:57