2

I am trying to create a generic way of iterating Enumerables using multiple processors. I am spawning a given number of workers using fork, and feeding them data to process reusing idle workers. However, I would like to synchronize the input and output order. If job 1 and job 2 are started simultaneously and job 2 is completed before job 1, then the result order is out of sync. I would like to cache the output on the fly somehow to synchronize the output order, but I fail to see how this can be done?

#!/usr/bin/env ruby

require 'pp'

DEBUG = false
CPUS  = 2

module Enumerable
  # Fork each (feach) creates a fork pool with a specified number of processes
  # to iterate over the Enumerable object processing the specified  block.
  # Calling feach with :processes => 0 disables forking for debugging purposes.
  # It is possible to disable synchronized output with :synchronize => false
  # which will save some overhead.
  #
  # @example - process 10 elements using 4 processes:
  #
  # (0 ... 10).feach(:processes => 4) { |i| puts i; sleep 1 }
  def feach(options = {}, &block)
    $stderr.puts "Parent pid: #{Process.pid}" if DEBUG

    procs = options[:processes]   || 0
    sync  = options[:synchronize] || true

    if procs > 0
      workers = spawn_workers(procs, &block)
      threads = []

      self.each_with_index do |elem, index|
        $stderr.puts "elem: #{elem}    index: #{index}" if DEBUG

        threads << Thread.new do 
          worker = workers[index % procs]
          worker.process(elem)
        end

        if threads.size == procs
          threads.each { |thread| thread.join }
          threads = []
        end
      end

      threads.each { |thread| thread.join }
      workers.each { |worker| worker.terminate }
    else
      self.each do |elem|
        block.call(elem)
      end
    end
  end

  def spawn_workers(procs, &block)
    workers = []

    procs.times do 
      child_read, parent_write = IO.pipe
      parent_read, child_write = IO.pipe

      pid = Process.fork do
        begin
          parent_write.close
          parent_read.close
          call(child_read, child_write, &block)
        ensure
          child_read.close
          child_write.close
        end
      end

      child_read.close
      child_write.close

      $stderr.puts "Spawning worker with pid: #{pid}" if DEBUG

      workers << Worker.new(parent_read, parent_write, pid)
    end

    workers
  end

  def call(child_read, child_write, &block)
    while not child_read.eof?
      elem = Marshal.load(child_read)
      $stderr.puts "      call with Process.pid: #{Process.pid}" if DEBUG
      result = block.call(elem)
      Marshal.dump(result, child_write)
    end
  end

  class Worker
    attr_reader :parent_read, :parent_write, :pid

    def initialize(parent_read, parent_write, pid)
      @parent_read  = parent_read
      @parent_write = parent_write
      @pid          = pid
    end

    def process(elem)
      Marshal.dump(elem, @parent_write)
      $stderr.puts "   process with worker pid: #{@pid} and parent pid: #{Process.pid}" if DEBUG
      Marshal.load(@parent_read)
    end

    def terminate
      $stderr.puts "Terminating worker with pid: #{@pid}" if DEBUG
      Process.wait(@pid, Process::WNOHANG)
      @parent_read.close
      @parent_write.close
    end
  end
end

def fib(n) n < 2 ? n : fib(n-1)+fib(n-2); end # Lousy Fibonacci calculator <- heavy job

(0 ... 10).feach(processes: CPUS) { |i| puts "#{i}: #{fib(35)}" }
maasha
  • 1,926
  • 3
  • 25
  • 45
  • Consider having your child processes write to a database. You can pass the children enough info, perhaps a job number, which they then store along with their results. A clean-up script can then walk the table, with an `order by` clause to retrieve the results in order. – the Tin Man Mar 27 '14 at 21:21
  • @theTinMan I would like to avoid databases or anything bound to limiting disk I/O. I can send messages/results to and from children though IO pipes - so instead of a database I recon a simple memory based cache (Ruby object), and a method to control that cache instead of a clean-up script. – maasha Mar 28 '14 at 10:19

1 Answers1

1

There is no way to sync the output unless you force all the child processes to send their output to the parent and have it sort the results, or you enforce some kind of I/O locking between processes.

Without knowing what your long term goal is it's difficult to suggest a solution. In general, you'll need a lot of work in each process to gain any signficant speedup using fork and there is not a simple way to get results back to the main program.

Native Threads( pthreads on linux) might make more sense to accomplish what you are trying to do, however not all versions of Ruby support threads at that level. See :

Does ruby have real multithreading?

Community
  • 1
  • 1
  • I want to use this feach method to parse huge files by reading in chunks and delegate the actual parsing to the workers. I can send chunks to the workers and results to the parent using IO pipes. A smart cache and controller detecting what jobs are finished and outputting them in order should be doable - I think. – maasha Mar 28 '14 at 10:25
  • I am not interested in Ruby Threads as they don't support multi processor work - at least not in MRI. – maasha Mar 28 '14 at 12:45
  • Look at jRuby. The underlying Java has real threads. But define "huge files". And, have you looked into a MQ architecture. They can be amazingly fast. – the Tin Man Mar 28 '14 at 14:40
  • "Huge files" are multi Gb files in multi-line table formats (lookup FASTA and FASTQ). Again, I want this to work in MRI, and I feel I am close. The problem is catching stdout in the workers and sending that back to the parent. – maasha Apr 02 '14 at 10:05