1

In the Ruby IO.pipe example from the documentation, a single message is passed through the processes.

I wanted to do something similar, with two differences:

  1. use threads instead of processes
  2. use a pipe for ongoing messaging, not for a once-off message

This is the obvious, but non-working code:

rd, wr = IO.pipe

reader_thread = Thread.new(rd) do |rd|
  data_received = rd.read
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(wr) do |wr|
  wr.write "Message"
  wr.flush
end

write_thread.join
reader_thread.join

which causes reader_thread to hang on rd.read.

I could make it work by using IO#read_nonblock:

reader_thread = Thread.new(rd) do |rd|
  data_received = \
    begin
      rd.read_nonblock(100)
    rescue IO::WaitReadable, IO::EAGAINWaitReadable
      IO.select([rd])
      retry
    end

  puts "Read: #{data_received.inspect}"
end

Is this the correct pattern? Or is using IO.pipe the wrong tool for threads messaging?

Marcus
  • 5,104
  • 2
  • 28
  • 24

2 Answers2

0

You can also use a Queue for safely exchanging information between multiple threads:

q = Queue.new

reader_thread = Thread.new(q) do |q|
  data_received = q.pop
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(q) do |q|
  q.push "Message"
end

write_thread.join
reader_thread.join
wjordan
  • 19,770
  • 3
  • 85
  • 98
0

Your reader thread is hanging because with no arguments, IO.read will read -- and block -- until it encounters an EOF. (If you pass a length, it will read until it reads that many bytes, or an EOF, whichever happens first, so it will still block until it gets at least that much input.) This is explained in detail in the IO.pipe docs.

If you call wd.close before reader_thread.join, read will get that EOF and you'll get your output -- all at once, when read unblocks.

In a realistic scenario, you probably don't just want to read once, you probably want to loop until rd encounters an EOF, doing something with the data along the way. The simplest thing is just to read one byte at a time, with read(1). (I'm omitting the separate writer thread to keep things simple -- and you should too, unless you really need three separate instruction streams; often you'll want a background reader thread or a background writer thread, with the main thread handling the other end -- but the behavior's basically the same.

text = <<~TEXT.strip
  Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
  sed do eiusmod tempor incididunt ut labore et dolore magna 
  aliqua.
TEXT

read_io, write_io = IO.pipe

reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(1)) # block till we read one byte
    $stdout.write(c)
  end
  puts('...Done.')
end

# Write 50 chars/second, so we can see them get read one at a time
text.chars.each { |c| write_io.write(c); sleep(0.02) } 

reader_thread.join

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua.

This still hangs, though, because IO.read(1) is still waiting for that EOF, so again, you'll need to close write_io.

Also, it's usually not very efficient to read byte by byte. Realistically you'll probably want an 8K buffer, or even larger, depending on your use case.

reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(8192))
    $stdout.write(c)
  end
  puts('...Done.')
end

# We're writing 50 chars/second, but we won't see them print out
# till `read_io` has read 8192 bytes, or hit an EOF
text.chars.each { |c| write_io.write(c); sleep(0.02) }

write_io.close      # we have to close `write_io` *sometime* --
reader_thread.join  # -- or this will hang.

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua....Done.
David Moles
  • 48,006
  • 27
  • 136
  • 235