32

I have a computation that can be divided into independent units and the way I'm dealing with it now is by creating a fixed number of threads and then handing off chunks of work to be done in each thread. So in pseudo code here's what it looks like

# main thread
work_units.take(10).each {|work_unit| spawn_thread_for work_unit}

def spawn_thread_for(work)
  Thread.new do
    do_some work
    more_work = work_units.pop
    spawn_thread_for more_work unless more_work.nil?
  end
end

Basically once the initial number of threads is created each one does some work and then keeps taking stuff to be done from the work stack until nothing is left. Everything works fine when I run things in irb but when I execute the script using the interpreter things don't work out so well. I'm not sure how to make the main thread wait until all the work is finished. Is there a nice way of doing this or am I stuck with executing sleep 10 until work_units.empty? in the main thread

David K.
  • 6,153
  • 10
  • 47
  • 78
  • Doesn't `take(10)` mean that only the first 10 `work_units` will be processed, ever? – Andrew Grimm Jun 05 '11 at 23:57
  • @Andrew Grimm: Actually no, but rereading the question it looks like `work_units` would be out of scope for `spawn_thread_for` for the line `more_work = work_units.pop` – Andrew Grimm Jun 07 '11 at 10:58
  • In the actual code it's an instance variable so there are no scoping issues. – David K. Jun 07 '11 at 22:54
  • 1
    NOTE: [depending on which ruby implementation you use](https://stackoverflow.com/questions/56087/does-ruby-have-real-multithreading), the Threads might not really work in parallel (e.g. use multiple CPU cores / get the work done faster by splitting over multiple threads). – pjvleeuwen Feb 29 '20 at 22:16

5 Answers5

36

In ruby 1.9 (and 2.0), you can use ThreadsWait from the stdlib for this purpose:

require 'thread'
require 'thwait'

threads = []
threads << Thread.new { }
threads << Thread.new { }
ThreadsWait.all_waits(*threads)
esad
  • 2,660
  • 1
  • 27
  • 23
23

If you modify spawn_thread_for to save a reference to your created Thread, then you can call Thread#join on the thread to wait for completion:

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

produces:

abxyzc

(Stolen from the ri Thread.new documentation. See the ri Thread.join documentation for some more details.)

So, if you amend spawn_thread_for to save the Thread references, you can join on them all:

(Untested, but ought to give the flavor)

# main thread
work_units = Queue.new # and fill the queue...

threads = []
10.downto(1) do
  threads << Thread.new do
    loop do
      w = work_units.pop
      Thread::exit() if w.nil?
      do_some_work(w)
    end
  end
end

# main thread continues while work threads devour work

threads.each(&:join)
Andrew Grimm
  • 78,473
  • 57
  • 200
  • 338
sarnold
  • 102,305
  • 22
  • 181
  • 238
  • But this can potentially make `threads` quite large because when a thread completes it's reference is still in the `threads` array. Moreover `threads` is being modified while `threads.each(&:join)` is executing so this still doesn't solve the problem. – David K. Jun 05 '11 at 07:16
  • @davidk, perhaps instead of calling `spawn_thread_for` within each thread, simply look for more work and start directly? – sarnold Jun 05 '11 at 07:17
  • @sarnold, I only want a fixed number of threads to be active. If I look for more work and then start a thread it's possible I can overshoot the limit of 10 threads because of the uncertainty in how scheduling happens and the amount of work each thread is doing. – David K. Jun 05 '11 at 07:19
  • 2
    @davidk01, _why start another thread_ when one is 'done'? Just grab a new work unit in the already-running thread. Start ten threads and then let them start sucking work units off your work queue. (See the `Queue` class for a thread-safe queue implementation.) – sarnold Jun 05 '11 at 07:23
  • @sarnold, great point. I didn't think of it so thanks for pointing it out. – David K. Jun 05 '11 at 07:25
  • @davidk01, I re-wrote the answer a bit to try to show what I mean, maybe it'll even be useful. :) – sarnold Jun 05 '11 at 07:29
3
Thread.list.each{ |t| t.join unless t == Thread.current }
brauliobo
  • 5,843
  • 4
  • 29
  • 34
2

It seems like you are replicating what the Parallel Each (Peach) library provides.

Hector Castro
  • 423
  • 3
  • 7
1

You can use Thread#join

join(p1 = v1) public

The calling thread will suspend execution and run thr. Does not return until thr exits or until limit seconds have passed. If the time limit expires, nil will be returned, otherwise thr is returned.

Also you can use Enumerable#each_slice to iterate over the work units in batches

work_units.each_slice(10) do |batch|
  # handle each work unit in a thread
  threads = batch.map do |work_unit|
    spawn_thread_for work_unit
  end

  # wait until current batch work units finish before handling the next batch
  threads.each(&:join)
end
Hirurg103
  • 4,783
  • 2
  • 34
  • 50