3

I have 2.5 million records in the Content table of my Postgres Database, I need to go through each of those 2.5 million records and perform a number of actions (many of which are slow by themselves) and update the record at the end based on what I have gathered along the way - that all works - the problem, it takes for bloody ever to run.

I came across a couple of articles talking about multithreading such jobs (I have done this before in C, but never Ruby) and the pros and cons of using threads in Ruby, still despite those cons, the 2000 threads I can get off happen significantly faster than running without threading, but I can only get 2000 off at one time, limiting me from actually being able to update all 2.5 million records. Here is the code I had for that:

Content.all.each do |content|
  threads << Thread.new do
    grab_and_store(content)
  end
  index += 1
  index % 100 == 0 ? (puts index) : nil
end
threads.map(&:join)

I also read about thread pooling, using the same threads to do other jobs once they have completed their original one, but I can't seem to get it to work. Here is the code that I had:

POOL_SIZE = 1000

jobs = Queue.new
Content.all.each{ |x| jobs.push x }

workers = (POOL_SIZE).times.map do
  Thread.new do
    begin
      while x = jobs.pop(true)
        grab_and_store(x)
      end
    rescue ThreadError
    end
  end
end
workers.map(&:join)

When I run this I get an error that I can't execute .join on a nil class, which would mean that workers is nil at the end of this. But when I take the code that I based this off of (shown below, and source) and run that it works perfectly. I can't seem to figure out where mine is breaking / how to best implement the thread pool to stop my code from running out of resources after 2000 threads.

Thanks!

P.S. Heres the code from the tutorial I used:

require 'thread'
work_q = Queue.new
(0..50).to_a.each{|x| work_q.push x }
workers = (0...4).map do
  Thread.new do
    begin
      while x = work_q.pop(true)
        50.times{print [128000+x].pack "U*"}
      end
    rescue ThreadError
    end
  end
end; "ok"
workers.map(&:join); "ok"

Update:

Per Anthony's answer I found myself with the following chunk of code, using the ruby-thread gem he recommended, it runs through the given Content really quickly (it's a sample size of 1000), but when I check console it appears to have only saved around 20 max. Here's the code:

pool = Thread.pool(5)

@ids = []
arr = Content.where(needs_update: true)[0...1000]

puts "Starting With Sample 1000"

arr.each do |content|
  pool.process do
    grab_and_store(content)
  end
  index += 1
  index % 100 == 0 ? (puts index) : nil
end

pool.shutdown
Zubatman
  • 1,235
  • 3
  • 17
  • 34
  • 1
    Have you tried with a smaller pool size? There's almost certainly very little to be gained with such a large number of threads. – Frederick Cheung Sep 16 '15 at 07:49
  • I shrunk the thread pool size down to around 5 when I switched to the ruby-thread gem, but it appears to not be saving all of the records, at max it'll save 50 / 1000 at a time – Zubatman Sep 16 '15 at 14:44

1 Answers1

10

I've used the ruby-thread gem which add's pool support like so:

require 'thread/pool'

pool = Thread.pool(50)

Content.all.each do |content|
  pool.process do
    grab_and_store(content)
  end
end

pool.shutdown
Anthony
  • 15,435
  • 4
  • 39
  • 69
  • I used this approach, when I log the ids of the Content updated it has the right amount for my sample size, but when I check the console and see which ones have been saved I see that it's only actually saved about 5 of those and hasn't updated the rest of the ones its supposedly processed. Any ideas what could be going wrong there? – Zubatman Sep 16 '15 at 14:21
  • The only problem I see here is that ActiveRecord/Your Database likely has a pool size that is less than the thread pool you're using which might be limiting connections. What is listed in your database.yml file? – Anthony Sep 16 '15 at 14:27
  • Dev it has the pool size at 5 – Zubatman Sep 16 '15 at 14:31
  • What you would need / what would help? – Zubatman Sep 16 '15 at 14:42
  • what does grab and store do? Can we raise an exception when something goes to update but can't [for some reason] ? Does it work on a smaller scale(less records? less threads?) – Anthony Sep 16 '15 at 14:44
  • I'm running it right now with a sample size of 1000 on > 3 threads, I got max 50 saved when I ran it with 2 threads. Grab and store controls like 5 other methods (grabbing data and making sense of it) and then updates the facets of the Content object based on what was "grabbed". – Zubatman Sep 16 '15 at 14:51