7

I've just learned multi-threaded programming today due to a project requirement.

I have a string processing task which can be nicely divided into small subtasks.

while (...){
    ...
    // assign task for handler
    Thread t = new Thread(new PCHandler(counter,pc));
    t.start();
    counter++;
}

The problem is that I will need around 500K threads for this task. And I run into an error:

Caused by: java.lang.OutOfMemoryError: unable to create new native thread

I searched the web and it seems JVM only allows me to make maximum 32K threads. There are some instructions to extend this limit by modifying the profile file. But I want to avoid modify user's computer. So could you give me an advice how to manage them wisely within the limit?

halfer
  • 19,824
  • 17
  • 99
  • 186
sean
  • 1,632
  • 2
  • 15
  • 34
  • 4
    You should seriously consider a thread pool rather than one thread per task. The overhead of each thread will likely exceed the benefits of adding that much concurrency to your program. – Bret Kuhns Oct 02 '13 at 13:09
  • 3
    You could start another JVM instance and launch other threads, or use a thread pool, but what are you trying to accomplish? 500k threads sounds really a lot to me – BackSlash Oct 02 '13 at 13:09
  • 1
    Have you tried looking at using Thread Pool, using Executors? – Optional Oct 02 '13 at 13:09
  • 2
    As a rule of thumb, the number of concurrently running threads should be about the same as the number of your cores. – Matt Oct 02 '13 at 13:10
  • 3
    "I will need around 500K threads for this task" I seriously doubt this. Most likely you don't have this many CPUs so you don't need this many threads. – Peter Lawrey Oct 02 '13 at 13:10
  • You have just heard about multi-threaded programming but not much learned or understood. – Arne Burmeister Oct 04 '13 at 20:28

2 Answers2

24

The problem is that I will need around 500K threads for this task. And I run into a [memory error].

Sounds to me that you should be using a thread-pool so you can submit a large number of jobs but only run them in a smaller number of threads.

// create a thread pool with 10 threads, this can be optimized to your hardware
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// submit your handlers to the thread-pool
for (PCHandler handler : handlersToDo) {
    threadPool.submit(handler);
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...

If this won't work then I'd like to know more details about a system that actually needs 500k concurrently running threads. You may be able to achieve this with some memory setting tweaking and increasing the core memory on your box but I suspect that re-architecting your application is in order.

As @Peter mentions in comments, to optimize the number of threads in the pool you can get the number of available processors and other system specs to figure this out. But it depends highly on how CPU intensive your PCHandler class is. The more IO it does, the more concurrency can be taken advantage of. Probably doings some test runs with different values passed to the newFixedThreadPool(...) method is in order to determine the optimal setting there.

Also, depending on how large the 500k job objects are, you may want to limit their creation. To do that, you could create a thread-pool with a bounded queue which would limit the number of jobs that can be outstanding at any one point in time.

Gray
  • 115,027
  • 24
  • 293
  • 354
  • 1
    +1 For CPU bound tasks the optimal number of threads may be the number CPU you have e.g. `Runtime.availableProcessors()` For resource bound tasks the optimal number might be less. – Peter Lawrey Oct 02 '13 at 13:12
  • 4
    In addition to the above, if you are thinking in terms of 500k separate tasks, your tasks are probably too small. Each task should be a bit time consuming in it's own right. It might be that it makes sense to have 500 jobs that each do 1000 of your sub task, rather than 500k jobs, for example. There is overhead in managing threads such that each one should be doing a decent chunk of work. – lgaud Oct 02 '13 at 13:15
  • @Peter excellent point. One minor nitpick, for resource limited tasks you generally want more threads than CPUs since they will spend a lot of time waiting for the resources. – user949300 Oct 04 '13 at 19:56
  • @Gray thanks. In my tests on CPU bound tasks (on an Intel core cpu) a good number of threads was N or 2N, where N is number of cores. Above that they start thrashing. Haven't done many IO bound tests. – user949300 Oct 04 '13 at 20:20
  • I'm surprised by that. I've run 1000s of threads in my applications without issue. I used to worry about thrashing with many threads doing high CPU load but I find that modern context switching and time slicing algorithms and OS support make it a non-issue @user949300. – Gray Oct 04 '13 at 20:22
  • @gray yes 1000 threads work but signicantly slower than the optimal number threads. – Peter Lawrey Oct 05 '13 at 09:31
  • On my 1x4 core i7 Mac, 1000 was not noticeably slower although the variance was high. 2000 however did show a drop off. This was a raw CPU counting loop with a 5 second warm up and then 5-10 second run time times 5 iterations to get the average @PeterLawrey. – Gray Oct 07 '13 at 00:17
1

Definitely not a good option to manage so many threads in a single machine by a single application unless it is a 16+ core machine or higher.

Consider factors like is your work I/O intensive or CPU intensive and make appropriate choices. Read here and here

I usually use

int maxThreadCount = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = 
    new ThreadPoolExecutor(
      0, maxThreadCount - 1,
      1, TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(maxThreadCount * 2),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.CallerRunsPolicy());

Now do the processing by adding your tasks and wait until everything is finished:

while (moreTaskstoDo) {
Callable c =...
    executor.submit(c);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Now with Java 8+ in place you could think of doing it more efficiently.

I did a small benchmarking myself. The below code is inspired by article and you can read more about Java 8 Handbook

Consider this function of finding a total.

//approach 1: old school
private static void findingTotalOldSchool()  {
    long total = 0;
    long start = System.nanoTime();

    for (long i = 1; i < LIMIT; i++) {
        total = total + (i * FACTOR);
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

public static Range range(int max)  {
    return new Range(max);
}

// Approach 2: custom iterator
private static void findingTotalCustomIterator() {
    long total = 0;
    long start = System.nanoTime();

    for (long i : range(LIMIT)) {
        total = total + i * FACTOR;
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 3: using streams
private static void findingTotalStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 4: using parallel streams
private static void findingTotalParallelStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .parallel()
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 5: Using Completable Futures alone
private static void findingTotalCFS() {
     long start = System.nanoTime();

     List<CompletableFuture<Long>> futures = 
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> t * FACTOR ))
             .collect(Collectors.toList());
     //Code here --- could run ahead hence joining on futures
     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 6: Using Completable Futures managed by Executor Service
private static void findingTotalCFSE() {
    long start = System.nanoTime();

    ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    List<CompletableFuture<Long>> futures =
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> {
                    return t * FACTOR;
            }, executor))
             .collect(Collectors.toList());

     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
     executor.shutdownNow();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 7: Using Executor service alone
private static void findingTotalES() {
    long start = System.nanoTime();

    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    long total  = LongStream.
        range(1, LIMIT)
        .boxed()
        .map((i)->executorService.submit(new Operation(i, FACTOR)))
        .map((Future<Long> future)-> {
            try {
                return future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }catch (ExecutionException e) {
                // Extract the actual exception from its wrapper
                Throwable t = e.getCause();
            } 
            return 0;
        })
        .mapToLong(t->t.longValue())
        .sum();

    executorService.shutdown();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

class Operation implements Callable<Long> {

    long i; int j;
    Operation(long i, int j) { this.i = i; this.j = j; }

    @Override
    public Long call() {
        return i * j;
    }
}


class Range implements Iterable<Integer> {

    private int limit;

    public Range(int limit) {
        this.limit = limit;
    }

    @Override
    public Iterator<Integer> iterator() {
        final int max = limit;
        return new Iterator<Integer>() {

            private int current = 0;

            @Override
            public boolean hasNext() {
                return current < max;
            }

            @Override
            public Integer next() {
                if (hasNext()) {
                    return current++;   
                } else {
                    throw new NoSuchElementException("Range reached the end");
                }
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException("Can't remove values from a Range");
            }
        };
    }
}

We ran test runs with 2 sets of data. Each test should be run individually and not as part of a single whole run (as JVM optimizes and the result might vary).

//first run
final static int FACTOR = 1;
final static int LIMIT = 10000;

//second run
final static int FACTOR = 9876;
final static int LIMIT = 1000000;


System.out.println("-----Traditional Loop-----");
findingTotalOldSchool();
// 0 ms
// 4 ms     

System.out.println("-----Custom Iterator----");
findingTotalCustomIterator();
// 1 ms
// 15 ms


System.out.println("-----Streams-----");
findingTotalStream();
// 38 ms
// 33 ms        


System.out.println("-----Parallel Streams-----");
findingTotalParallelStream();
// 29 ms
// 64 ms


System.out.println("-----Completable Futures with Streams-----");
findingTotalCFS();
// 77 ms
// 635 ms       


System.out.println("-----Executor Service with Streams-----");
findingTotalES();
// 323 ms
// 12632 ms

System.out.println("-----Completable Futures with Executor Service with Streams-----");
findingTotalCFSE();
// 77 ms
// 844 ms   

Observations:

  • Traditional loop is fast most of the cases.
  • Use parallel streams when performance or IO operations are involved.
  • For simple iterations (involving substitutions or simple numeric calculations) go for traditional loop.
  • Completable Futures with Executor Service is flexible and a go to option when you need more control on the number of threads, etc. If what your doing is complex go for higher order systems that helps you to distribute it horizontally like Akka or Vert.x
John Eipe
  • 10,922
  • 24
  • 72
  • 114