74

I was wondering if there is a Parallel.For equivalent to the .net version for Java?

If there is could someone please supply an example? thanks!

user456584
  • 86,427
  • 15
  • 75
  • 107
Jamie
  • 1,005
  • 5
  • 16
  • 25

11 Answers11

114

I guess the closest thing would be:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
    for (final Object o : list) {
        exec.submit(new Runnable() {
            @Override
            public void run() {
                // do stuff with o.
            }
        });
    }
} finally {
    exec.shutdown();
}

Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();

Edit: Decided to add a basic "Parallel.For" implementation

public class Parallel {
    private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();

    private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));

    public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
        try {
            // invokeAll blocks for us until all submitted tasks in the call complete
            forPool.invokeAll(createCallables(elements, operation));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
        List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
        for (final T elem : elements) {
            callables.add(new Callable<Void>() {
                @Override
                public Void call() {
                    operation.perform(elem);
                    return null;
                }
            });
        }

        return callables;
    }

    public static interface Operation<T> {
        public void perform(T pParameter);
    }
}

Example Usage of Parallel.For

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
    elems.add(i);
}
Parallel.For(elems, 
 // The operation to perform with each item
 new Parallel.Operation<Integer>() {
    public void perform(Integer param) {
        System.out.println(param);
    };
});

I guess this implementation is really more similar to Parallel.ForEach

Edit I put this up on GitHub if anyone is interested. Parallel For on GitHub

Brendan Long
  • 53,280
  • 21
  • 146
  • 188
Matt Wonlaw
  • 12,146
  • 5
  • 42
  • 44
  • 1
    Yea, you could easily pass a Anonymous class to a method that does the work for you. Could even add a CountDownLatch and have each run() deincriment it so that you can have it block. The only issue is that you don't know the number of cores a computer has, so you can't optimize the number of threads for the cores. – TheLQ Oct 24 '10 at 20:41
  • You can have the following to drop the extra obj reference: for (final Object o : list) { – Peter Lawrey Oct 24 '10 at 23:36
  • An example on how to use this might be nice. Getting hung up on how to setup the Operation... – jocull Oct 22 '12 at 04:33
  • 1
    @jocull - I added an example. Thanks for the suggestion. – Matt Wonlaw Oct 23 '12 at 00:57
  • 1
    Consider using Executors.newCachedThreadPool() to share the thread pool. That way if you are doing this in multiple places you won't be creating multiple thread pools that try to use all of the processors. – dcstraw Apr 23 '13 at 19:51
  • Setting `numThreads=f(numProcessors)` seems reasonable as long as there is **no blocking IO** taking place inside the parallel-for loop.... But now imagine the scenario of a for loop that is **not CPU-bound, but IO-bound** - for example a loop over 100 synchronous HTTP requests (may take 100 ms per request). I can imagine that in this scenario, limiting the number of threads to, say, 8 threads does not exploit the full level of achievable parallelism. – Abdull Jun 18 '13 at 03:13
  • I have a `Map`, and when I try the parallel.For over the keySet(), calling the `new.Operation` I' getting: `The method For(int, Iterable extends T>, Parallel.Operation) in the type Parallel is not applicable for the arguments`. Something to change in my CustomClass? – Pedro Dusso Apr 09 '14 at 06:42
  • is `Thread.sleep` necessary within `for (final Object o : list)`? in my test, if not use, the cpu and memory usage is very high. – Lei Yang Nov 21 '19 at 08:41
10

MLaw's solution is a very practical Parallel.ForEach. I added a bit modification to make a Parallel.For.

public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters,
                   final LoopBody<T> loopBody)
{
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters)
    {
        Future<?> future = executor.submit(new Runnable()
        {
            public void run() { loopBody.run(param); }
        });

        futures.add(future);
    }

    for (Future<?> f : futures)
    {
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         
    }

    executor.shutdown();     
}

public static void For(int start,
                   int stop,
               final LoopBody<Integer> loopBody)
{
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i<stop; i++)
    {
        final Integer k = i;
        Future<?> future = executor.submit(new Runnable()
        {
            public void run() { loopBody.run(k); }
        });     
        futures.add(future);
    }

    for (Future<?> f : futures)
    {
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         
    }

    executor.shutdown();     
}
}

public interface LoopBody <T>
{
    void run(T i);
}

public class ParallelTest
{
int k;  

public ParallelTest()
{
    k = 0;
    Parallel.For(0, 10, new LoopBody <Integer>()
    {
        public void run(Integer i)
        {
            k += i;
            System.out.println(i);          
        }
    });
    System.out.println("Sum = "+ k);
}

public static void main(String [] argv)
{
    ParallelTest test = new ParallelTest();
}
}
Weimin Xiao
  • 113
  • 1
  • 4
9

Built upon mlaw suggestion, add CountDownLatch. Add chunksize to reduce submit().

When tested with 4 million items array, this one gives 5X speed up over sequential for() on my Core i7 2630QM CPU.

public class Loop {
    public interface Each {
        void run(int i);
    }

    private static final int CPUs = Runtime.getRuntime().availableProcessors();

    public static void withIndex(int start, int stop, final Each body) {
        int chunksize = (stop - start + CPUs - 1) / CPUs;
        int loops = (stop - start + chunksize - 1) / chunksize;
        ExecutorService executor = Executors.newFixedThreadPool(CPUs);
        final CountDownLatch latch = new CountDownLatch(loops);
        for (int i=start; i<stop;) {
            final int lo = i;
            i += chunksize;
            final int hi = (i<stop) ? i : stop;
            executor.submit(new Runnable() {
                public void run() {
                    for (int i=lo; i<hi; i++)
                        body.run(i);
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {}
        executor.shutdown();
    }

    public static void main(String [] argv) {
        Loop.withIndex(0, 9, new Loop.Each() {
            public void run(int i) {
                System.out.println(i*10);
            }
        });
    }
}
santiwk
  • 91
  • 1
  • 1
6

Here is my contribution to this topic https://github.com/pablormier/parallel-loops. The usage is very simple:

Collection<String> upperCaseWords = 
    Parallel.ForEach(words, new Parallel.F<String, String>() {
        public String apply(String s) {
            return s.toUpperCase();
        }
    });

It's also possible to change some behaviour aspects, like the number of threads (by default it uses a cached thread pool):

Collection<String> upperCaseWords = 
            new Parallel.ForEach<String, String>(words)
                .withFixedThreads(4)
                .apply(new Parallel.F<String, String>() {
                    public String apply(String s) {
                        return s.toUpperCase();
                    }
                }).values();

All the code is self-contained in one java class and has no more dependencies than the JDK. I also encourage you to check the new way to parallelize in a functional-style way with Java 8

Community
  • 1
  • 1
Pablo R. Mier
  • 719
  • 1
  • 7
  • 13
5

Fork join framework in Java 7 is for concurrency support. But I don't know about an exact equivalent for Parallel.For.

Kirk Woll
  • 76,112
  • 22
  • 180
  • 195
Emil
  • 13,577
  • 18
  • 69
  • 108
  • Not to be rude, but ... the thing you linked too... tells me nothing. It's like some library that some guy might be working on somewhere, and it might be released (or it might not be). With no hint of even the syntax, let alone a download package. -- Am I missing something? – BrainSlugs83 Apr 19 '15 at 06:42
  • @BrainSlugs83: ForkJoin is part of the standard library since Java 7: https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html – user268396 Aug 05 '18 at 14:39
4

A simpler option would be

// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC = 
    Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); 

//later 
EXEC.invokeAll(tasks); // you can optionally specify a timeout.
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
3

Synchronization often kills the speedup of parallel for-loops. Therefore, parallel for-loops often need their private data and a reduction mechanism to reduce all threads private data to comprise a single result.

So I've extended the Parallel.For version of Weimin Xiao by a reduction mechanism.

public class Parallel {
public static interface IntLoopBody {
    void run(int i);
}

public static interface LoopBody<T> {
    void run(T i);
}

public static interface RedDataCreator<T> {
    T run();
}

public static interface RedLoopBody<T> {
    void run(int i, T data);
}

public static interface Reducer<T> {
    void run(T returnData, T addData);
}

private static class ReductionData<T> {
    Future<?> future;
    T data;
}

static final int nCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
    ExecutorService executor = Executors.newFixedThreadPool(nCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters) {
        futures.add(executor.submit(() -> loopBody.run(param) ));
    }

    for (Future<?> f : futures) {
        try { 
            f.get();
        } catch (InterruptedException | ExecutionException e) { 
            System.out.println(e); 
        }
    }
    executor.shutdown();     
}

public static void For(int start, int stop, final IntLoopBody loopBody) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;

        futures.add(executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) 
                loopBody.run(j);
        }));     
    }

    for (Future<?> f : futures) {
        try { 
            f.get();
        } catch (InterruptedException | ExecutionException e) { 
            System.out.println(e); 
        }
    }
    executor.shutdown();     
}

public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<ReductionData<T>> redData  = new LinkedList<ReductionData<T>>();

    for (int i = start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;
        final ReductionData<T> rd = new ReductionData<T>();

        rd.data = creator.run();
        rd.future = executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) {
                loopBody.run(j, rd.data);
            }
        });
        redData.add(rd);
    }

    for (ReductionData<T> rd : redData) {
        try { 
            rd.future.get();
            if (rd.data != null) {
                reducer.run(result, rd.data);
            }
        } catch (InterruptedException | ExecutionException e) { 
            e.printStackTrace();
        }
    }
    executor.shutdown();     
}
}

Here is a simple test example: a parallel character counter using a non-synchronized map.

import java.util.*;

public class ParallelTest {
static class Counter {
    int cnt;

    Counter() {
        cnt = 1;
    }
}

public static void main(String[] args) {
    String text = "More formally, if this map contains a mapping from a key k to a " + 
            "value v such that key compares equal to k according to the map's ordering, then " +
            "this method returns v; otherwise it returns null.";
    Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
    Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();

    // first sequentially
    for(int i=0; i < text.length(); i++) {
        char c = text.charAt(i);
        Counter cnt = charCounter1.get(c);
        if (cnt == null) {
            charCounter1.put(c, new Counter());
        } else {
            cnt.cnt++;
        }
    }
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
    }

    // now parallel without synchronization
    Parallel.For(0, text.length(), charCounter2,
        // Creator
        () -> new TreeMap<Character, Counter>(), 
        // Loop Body
        (i, map) -> {
            char c = text.charAt(i);
            Counter cnt = map.get(c);
            if (cnt == null) {
                map.put(c, new Counter());
            } else {
                cnt.cnt++;
            }
        }, 
        // Reducer
        (result, map) -> {
            for(Map.Entry<Character, Counter> entry: map.entrySet()) {
                Counter cntR = result.get(entry.getKey());
                if (cntR == null) {
                    result.put(entry.getKey(), entry.getValue());
                } else {
                    cntR.cnt += entry.getValue().cnt;
                }
            }
        }
    );

    // compare results
    assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
    Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        Map.Entry<Character, Counter> entry2 = it2.next();
        assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
    }

    System.out.println("Well done!");
}
}
Chris
  • 31
  • 3
3

There is a equivalent for Parallel.For available as a java extension. It is called Ateji PX, they have a free version you can play with. http://www.ateji.com/px/index.html

It is the exact equivalent of parallel.for and looks similar to.

For ||

More examples and explaination on wikipedia: http://en.wikipedia.org/wiki/Ateji_PX

Closed thing in Java IMO

Clippy
  • 56
  • 2
1

I found ForkJoinPool and IntStream much helpfull in my case (Parallel For with limited number of threads).

C#:

static void MathParallel(int threads)
        {
            Parallel.For(1, partitions, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) => {
                partitionScores[i] = Math.Sin(3*i);
            });
        }

and Java equivalent:

static void mathParallel(int threads) {
        ForkJoinPool pool = new ForkJoinPool(threads);
            pool.submit(()-> IntStream.range(0, partitions).parallel().forEach(i -> {
                partitionScores[i] = Math.sin(3*i);
            }));
        pool.shutdown();
        while (!pool.isTerminated()){
        }
    }
FaFryk
  • 59
  • 1
  • 9
1

I have an updated Java Parallel class which can do Parallel.For, Parallel.ForEach, Parallel.Tasks, and partitioned parallel loop. Source code is as follows:

Examples of using those parallel loops are the following:

public static void main(String [] argv)
{
    //sample data
    final ArrayList<String> ss = new ArrayList<String>();

    String [] s = {"a", "b", "c", "d", "e", "f", "g"};
    for (String z : s) ss.add(z);
    int m = ss.size();

    //parallel-for loop
    System.out.println("Parallel.For loop:");
    Parallel.For(0, m, new LoopBody<Integer>()
    {
        public void run(Integer i)
        {
           System.out.println(i +"\t"+ ss.get(i));   
        }       
    });   

   //parallel for-each loop
   System.out.println("Parallel.ForEach loop:");
   Parallel.ForEach(ss, new LoopBody<String>()
   {
       public void run(String p)
       {
           System.out.println(p);               
       }       
   });

   //partitioned parallel loop
   System.out.println("Partitioned Parallel loop:");
   Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
   {
       public void run(Partition p)
       {
           for(int i=p.start; i<p.end; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }
   });

   //parallel tasks
   System.out.println("Parallel Tasks:");
   Parallel.Tasks(new Task []
   {
       //task-1
       new Task() {public void run()
       {
           for(int i=0; i<3; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }},

       //task-2
       new Task() {public void run()
       {
           for (int i=3; i<6; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }}   
   });
}
user456584
  • 86,427
  • 15
  • 75
  • 107
Weimin Xiao
  • 113
  • 1
  • 4
0

This is what I use for Java 7 and less.

For Java 8 you can use forEach()

[UPDATE ]

Parallel class :

private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;  

public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
    if (elements != null) {
        final Iterator<T2> iterator = elements.iterator();
        if (iterator.hasNext()) {
            final Throwable[] throwable = new Throwable[1];
            final Callable<Void> callable = new Callable<Void>() {
                boolean first = true;
                @Override
                public final Void call() throws Exception {
                    if ((first || operation.follow()) && iterator.hasNext()) {
                        T result;
                        result = iterator.next();
                        operation.perform(result);
                        if (first) {
                            synchronized (this) {
                                first = false;
                            }
                        }
                    }
                    return null;
                }
            };
            final Runnable runnable = new Runnable() {
                @Override
                public final void run() {
                    while (iterator.hasNext()) {
                        try {
                            synchronized (callable) {
                                callable.call();
                            }
                            if (!operation.follow()) {
                                break;
                            }
                        } catch (Throwable t) {
                            t.printStackTrace();
                            synchronized (throwable) {
                                throwable[0] = t;
                            }
                            throw new RuntimeException(t);
                        }
                    }
                }
            };
            final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
            for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
                executor.execute(runnable);
            }
            executor.shutdown();
            while (!executor.isTerminated()) {
                try {
                    Thread.sleep(0,1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
            if (throwable[0] != null) throw new RuntimeException(throwable[0]);
        }
    }
}

public interface Operation<T> {
    void perform(T pParameter);
    boolean follow();
}

Example

@Test
public void test() {
    List<Long> longList = new ArrayList<Long>();
    for (long i = 0; i < 1000000; i++) {
        longList.add(i);
    }
    final List<Integer> integerList = new LinkedList<>();
    Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {

        @Override
        public void perform(Number pParameter) {
            System.out.println(pParameter);
            integerList.add(pParameter.intValue());
        }

        @Override
        public boolean follow() {
            return true;
        }
    });
    for (Number num : integerList) {
        System.out.println(num);
    }
}

Monitoring

Crammeur
  • 678
  • 7
  • 17