12

How to handle exceptions in concurrent-ruby thread pools (http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)?

Example:

pool = Concurrent::FixedThreadPool.new(5) 
pool.post do
  raise 'something goes wrong'
end

# how to rescue this exception here

Update:

Here is simplified version of my code:

def process
  pool = Concurrent::FixedThreadPool.new(5)

  products.each do |product|
    new_product = generate_new_product

    pool.post do
      store_in_db(new_product) # here exception is raised, e.g. connection to db failed
    end
  end

  pool.shutdown
  pool.wait_for_terminaton
end

So what I want to achive, is to stop processing (break loop) in case of any exception.

This exception is also rescued at higher level of application and there are executed some cleaning jobs (like setting state of model to failure and sending some notifications).

David Moles
  • 48,006
  • 27
  • 136
  • 235
  • That depends on what kind of exception happens and how do you want to respond. Please extend your example. – Timo Schilling Nov 21 '16 at 12:47
  • I've updated original post with more explanations. –  Nov 21 '16 at 13:01
  • Have you looked for a approriate `:fallback_policy`? – sschmeck Nov 21 '16 at 13:10
  • 2
    Fallback policy tells only how to handle tasks that are rejected when maximum queue size is exceeded or when pool is shut down. –  Nov 21 '16 at 17:11
  • We've been struggling with this for months now, too. Since there really doesn't seem to be any sensible literature around, I asked the developers: https://github.com/ruby-concurrency/concurrent-ruby/issues/616 Hopefully, they will reply. – Kalsan Dec 22 '16 at 16:14

3 Answers3

3

The following answer is from jdantonio from here https://github.com/ruby-concurrency/concurrent-ruby/issues/616

" Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools."

So how about an implementation with Promises ? http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html In your case it would look something like this:

promises = []
products.each do |product|
  new_product = generate_new_prodcut

  promises << Concurrent::Promise.execute do 
    store_in_db(new_product)
  end
end

# .value will wait for the Thread to finish.
# The ! means, that all exceptions will be propagated to the main thread
# .zip will make one Promise which contains all other promises.
Concurrent::Promise.zip(*promises).value!
Doktor OSwaldo
  • 5,732
  • 20
  • 41
  • 4
    This is not a reasonable answer. Any application should be able to use public api directly, because it is public. This api should be transparent for user. It shouldn't just ignore exceptions. – puchu Mar 02 '17 at 21:43
  • that has actually nothing to do with my answer. It is your opinion as we can see here https://github.com/ruby-concurrency/concurrent-ruby/issues/634 and apperently not the opinion of everbody. I don't say you are wrong, maybe your way is better. but I provided a way to handle his problem, and even if it isn't the best answer it is a perfectly valid answer, that solved the problem for us. And I see the advantages of your method, but the downvote is completly based on your opinion! – Doktor OSwaldo Mar 08 '17 at 10:19
  • 3
    You've offered people to change *thread pool* into *promises* just because ... thread pool don't want to support abortable workers. xDD Yea I think it is enough to click -1 button. – puchu Mar 09 '17 at 08:33
  • No I offered the people nothing, I let them knew what answer there is in the official github repo. Which is a valid contribution to SO. – Doktor OSwaldo Mar 02 '20 at 18:22
  • How does one control concurrency with high-level abstractions such as promises? That has been the main concern for me, and I couldn't fine any answers in the documentation. – Janko Jan 14 '21 at 12:04
1

There may be a better way, but this does work. You will want to change the error handling within wait_for_pool_to_finish.

def process
  pool = Concurrent::FixedThreadPool.new(10)
  errors = Concurrent::Array.new

  10_000.times do
    pool.post do
      begin
        # do the work
      rescue StandardError => e
        errors << e
      end
    end
  end
  wait_for_pool_to_finish(pool, errors)
end

private

def wait_for_pool_to_finish(pool, errors)
  pool.shutdown

  until pool.shutdown?
    if errors.any?
      pool.kill
      fail errors.first
    end
    sleep 1
  end

  pool.wait_for_termination
end
grossadamm
  • 454
  • 3
  • 17
0

I've created an issue #634. Concurrent thread pool can support abortable worker without any problems.

require "concurrent"

Concurrent::RubyThreadPoolExecutor.class_eval do
  # Inspired by "ns_kill_execution".
  def ns_abort_execution aborted_worker
    @pool.each do |worker|
      next if worker == aborted_worker
      worker.kill
    end

    @pool = [aborted_worker]
    @ready.clear

    stopped_event.set
    nil
  end

  def abort_worker worker
    synchronize do
      ns_abort_execution worker
    end
    nil
  end

  def join
    shutdown

    # We should wait for stopped event.
    # We couldn't use timeout.
    stopped_event.wait nil

    @pool.each do |aborted_worker|
      # Rubinius could receive an error from aborted thread's "join" only.
      # MRI Ruby doesn't care about "join".
      # It will receive error anyway.

      # We can "raise" error in aborted thread and than "join" it from this thread.
      # We can "join" aborted thread from this thread and than "raise" error in aborted thread.
      # The order of "raise" and "join" is not important. We will receive target error anyway.

      aborted_worker.join
    end

    @pool.clear
    nil
  end

  class AbortableWorker < self.const_get :Worker
    def initialize pool
      super
      @thread.abort_on_exception = true
    end

    def run_task pool, task, args
      begin
        task.call *args
      rescue StandardError => error
        pool.abort_worker self
        raise error
      end

      pool.worker_task_completed
      nil
    end

    def join
      @thread.join
      nil
    end
  end

  self.send :remove_const, :Worker
  self.const_set :Worker, AbortableWorker
end

class MyError < StandardError; end

pool = Concurrent::FixedThreadPool.new 5

begin
  pool.post do
    sleep 1
    puts "we shouldn't receive this message"
  end

  pool.post do
    puts "raising my error"
    raise MyError
  end

  pool.join

rescue MyError => error
  puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end

sleep 2

Output:

raising my error
received my error, trace:
...

This patch works fine for any version of MRI Ruby and Rubinius. JRuby is not working and I don't care. Please patch JRuby executor if you want to support it. It should be easy.

puchu
  • 3,294
  • 6
  • 38
  • 62