3

I'm using Rails 5. I have this gem for managing threads ...

gem 'concurrent-ruby'

I notice that if one of my threads throws an error, it is just swallowed and I never find out about it. I tried this in a console

pool = Concurrent::FixedThreadPool.new(1)
  # => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>> 
nums.each do |num|
  pool.post do
    if num == 1
      asdfasdf
    end
  end
end
  # => [1, 2, 3] 
pool.shutdown             # => true 
pool.wait_for_termination # => true 

I was wondering how, if one of the threads from my pool throws an error, I can throw an exception when all the threads have completed, halting my program. If none of the threads throws an error, then I'm fine to continue with whatever was happening.

Above, you'll notice I intentionally cause a condition that should result in an error, but I never find out about it, because I guess the threadpool is swallowing the output of the exceptions.

ndnenkov
  • 35,425
  • 9
  • 72
  • 104

2 Answers2

3

If you need built-in exception handling, you should use a higher-level abstraction instead of using thread pools directly. Refer to this comment from the author of concurrent-ruby:

Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

Here's a variation on your example using the Promise abstraction. This will re-raise an exception as soon as the thread pool raises one:

require 'concurrent'
pool = Concurrent::FixedThreadPool.new(1)
promises = (1..10).map do |num|
  Concurrent::Promise.execute(executor: pool) do
    if num == 1
      asdfasdf
    else
      num
    end
  end
end
promises.map(&:value!)

# NameError: undefined local variable or method `asdfasdf' for main:Object
#     from (irb):57:in `block (2 levels) in irb_binding'
#     [...]

To re-raise an exception only after all threads have completed (not immediately upon the first exception), you can replace promises.map(&:value!) with Concurrent::Promise.zip(*promises).value!.

To store the exception in the collection result without re-raising it, you can do something like promises.map { |p| p.value || p.reason }:

# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Finally, note that a fixed thread pool with only 1 thread will execute all tasks sequentially on a single thread. To execute them all in parallel (on a pool with 10 threads), change the thread-pool initializer to pool = Concurrent::FixedThreadPool.new(10).

Community
  • 1
  • 1
wjordan
  • 19,770
  • 3
  • 85
  • 98
  • I need to read up on the Concurrent::Promise class, but does this still run all ten iterations in parallel? –  Feb 01 '17 at 19:17
  • @Natalia A fixed thread pool with only 1 thread wouldn't run anything in parallel. But if you change the initializer to `Concurrent::FixedThreadPool.new(10)`, then yes they will run in parallel. Confirm/test for yourself by adding `sleep 1` at the top of the `Concurrent::Promise.execute` block (right above `if num == 1`). – wjordan Feb 01 '17 at 19:25
  • Thanks. Will the "promises.map(&:wait).map(&:value!)" line also block execution from finishing until all the threads in teh pool have been executed? I ask because as I test this out I'm starting to see errors like " (ActiveRecord::ConnectionTimeoutError) could not obtain a connection from the pool within 5.000 seconds" and I'm trying to figure out how to troubleshoot. –  Feb 01 '17 at 21:49
  • @Natalia Yes, [`#wait`](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Concern/Obligation.html#wait-instance_method) blocks execution until the Promise has been executed, so calling `promises.map(&:wait).map(&:value!)` will block until all promises have been executed. It looks like you could also use [`Concurrent::Promise.zip(*promises)`](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html#zip-class_method) to achieve the same effect. – wjordan Feb 01 '17 at 22:15
  • Is "pool = Concurrent::FixedThreadPool.new(1)" the equivalent of having no concurrency because there should only be one thread running at once? –  Feb 03 '17 at 20:45
  • @Natalia running a task on a single-thread `FixedThreadPool` would be more or less equivalent, but not exactly the same as 'having no concurrency' - the pool still executes tasks on a separate thread that runs concurrently with the main Ruby process. When using `wait`/`value` which blocks until the task in the thread pool has completed, it should execute in sequential order effectively equivalent to if there were no concurrency at all. For true 'zero-concurrency' use [`ImmediateExecutor.new`](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ImmediateExecutor.html) instead. – wjordan Feb 03 '17 at 22:46
  • Thanks, and then to clarify in case I'm being dense, when you mention the "wait/value" where would that be added in the solution you listed above? Or is "promises.map(&:value!)" the same thing? –  Feb 06 '17 at 22:41
  • @Natalia `wait`, `wait!`, `value` and `value!` are all instance methods that will block until the `Promise` is complete (see documentation for [`Obligation`](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Concern/Obligation.html), which `Promise` inherits from). The only difference is that the `!` variants re-raise an exception, and `value` returns the value resolved by the `Promise` while `wait` returns the `Promise` itself. [`Array#map`](https://ruby-doc.org/core-2.2.0/Array.html#method-i-map) invokes a method on each element in an array. So `promises.map(&:value!)` is fine. – wjordan Feb 07 '17 at 15:43
  • Something about your code is kind of strange. Even with a thread pool size of one, I"m noticing the "ActiveRecord::Base.connection_pool.connections.size" count continues to increase without connections decreasing. I don't know a lot about how the database connection pool works, but it strikes me that it should always be one in a single threaded application. –  Feb 07 '17 at 21:28
  • @Natalia please split out your additional question related to ActiveRecord's connection pool count into another thread, along with [a minimal, complete and verifiable example](http://stackoverflow.com/help/mcve), because this particular problem doesn't appear anywhere in the question I've answered in this thread. – wjordan Feb 07 '17 at 22:28
3

To answer your question - no actual way as the library explicitly silences exceptions and there is no configuration for it.

A possible workaround would be to capture exceptions manually:

error = nil
pool = Concurrent::FixedThreadPool.new(1)
numbers.each do |number|
  pool.post do
    begin
      some_dangerous_action(number)
    rescue Exception => e
      error = e
      raise # still let the gem do its thing
    end
  end
end

pool.shutdown
pool.wait_for_termination

raise error if error
ndnenkov
  • 35,425
  • 9
  • 72
  • 104