0

Ruby version: ruby-3.2.1

I have a requirement of reading a csv file which contains 50k rows. Then with each row i need to execute 7 APIs one by one in the order to create an entry in a third part application.

I am trying to use async feature of concurrent-ruby gem. Below is a sample code of the same.

require 'csv'
require "json"
require 'byebug'
require 'concurrent'

class Processor
  include Concurrent::Async

  def consume(c_row)
    ## Inside consume methods 7 APIS are called in sequence for each row
    puts "processing!!!! -> #{c_row}"
    sleep 1 if c_row % 2
    sleep 5 if c_row % 3 
  end
end


class ImporterAsync
  def self.perform(master_file)
    begin
      ## CSV has 50k entries ##
      CSV.foreach(master_file, headers: true).with_index(1) do |row|
        Processor.new.async.consume(row)
      end
    rescue StandardError => e
      puts "Main script failed: #{e.message}"
    ensure
      puts 'closing'
    end
  end
end

ImporterAsync.perform('master_file.csv')

I am getting a no response from the code when I invoke the processor with async keyword Processor.new.async.consume(row)

But code executes fine without async keyword. Processor.new.consume(row).

I believe with asynchronous way will help my code to execute better with this load. I am also open for suggestions.

Thanks in advance - Ajith

tgf
  • 2,977
  • 2
  • 18
  • 29
Ajith
  • 325
  • 4
  • 17
  • adding a sleep 0.1 seconds inside the main loop is executing the consume method in async way. – Ajith May 22 '23 at 03:23

2 Answers2

1

Your program effectively exits immediately after creating the async threads. You need to wait for them or they'll terminate with the rest of the program. Using async like this actually returns a Concurrent::IVar that you can use to track the work in each one on #value.

class ImporterAsync
  def self.perform(master_file)
    futures = []
    begin
      CSV.foreach(master_file, headers: true).with_index(1) do |row|
        future = Processor.new.async.consume(row)
        futures.push(future)
      end
    rescue StandardError => e
      puts "Main script failed: #{e.message}"
    ensure
      puts 'closing'
    end

    futures.each do |t|
      t.wait
      puts "finished (#{t.value.to_s[0...40].strip}) #{Time.now}"
    end
    puts "ALL DONE"
  end
end

This is an altered version of your class to collect each IVar up into an array. After starting them all it waits for each one to finish. This means the program will wait for ALL of them before exiting. (Note that finished can also mean an exception was raised, it doesn't imply a "good" finish.)

tgf
  • 2,977
  • 2
  • 18
  • 29
  • Thank you. I did not know the part about waiting can be done for async as well. In a impression that async is fire and forget. I was thinking it is for Concurrent::Future only. your explanation helped me. – Ajith May 23 '23 at 06:14
  • Happy I could be helpful. Good luck! Also, you may want to look into Fibers at some point as well, I believe they're supposed to be more efficient. Since you're hitting an API, your threads will mostly be waiting and fibers should work for that too. – tgf May 23 '23 at 07:57
  • After processing around 200 rows, i start getting the error Net::ReadTimeout with #. i have already read_timeout 120. – Ajith May 24 '23 at 15:06
  • This is probably worth asking in a new question. But as a guess, I'd wonder if there isn't a connection limit on how many connections you can open to the same service. (Are they all going to the same service?) Maybe processing in groups of 100 at a time or giving each thread 100 rows to process sequentially would work better. You'll probably also need to handle failures, since 10_000 requests is very likely to produce at least an error or two. – tgf May 25 '23 at 00:56
  • One more suggestion. On the futures you don't have to just wait - you could loop and watch their `#state`. I think the possible values are `:pending, :fulfilled, :rejected`. Maybe then re-enqueue any that fail once? – tgf May 25 '23 at 01:03
0

Actually i referred the below link to get the understanding of what is happening.

Celluloid async inside ruby blocks does not work

But i am still open for suggestions.

Ajith
  • 325
  • 4
  • 17