3

I am currently implementing a simplistic pub/sub system in ruby using only the standard lib, where a client can subscribe to a topic for messages or publish messages to a topic, which will be sent to all subscribers. I'm using just standard TCPServer and assuming all messages are on a single line read using gets. I'm using ruby Queues to communicate between threads and avoid accessing shared memory resources unless they are thread safe. I have reached a point where I can't see any other way to continue other than share my TCPSocket client between threads as one thread needs to block in a loop while awaiting new data on the socket, while the other needs to await new messages published and write them to the client. Are TCPSockets in ruby thread safe? If not would a simple dup or clone call suffice and have each thread with it's own reference to the socket?

For code reference I have the following basic implementation

require 'socket'

socket = TCPServer.new(4242)

processing_queue = Queue.new
Thread.start({}) do |subscriptions|
  while event = processing_queue.pop
    command, *args = event
    case command
    when 'publish'
      topic, message = args
      subscriptions[topic].each do |subscription_queue|
        subscription_queue.push(message)
      end
    when 'subscribe'
      topic, subscription_queue = args
      subscriptions[topic] = [] if subscriptions[topic].nil?
      subscriptions[topic] << subscription_queue
    end
  end
end

loop do
  Thread.start(socket.accept, Queue.new) do |client, queue|
    writer_queue = Queue.new
    Thread.start do
      while response = writer_queue.pop
        client.puts(response)
      end
    end

    while request = client.gets
      command, *args = request.split(' ')
      case command
      when 'subscribe'
        topic = args[0]
        Thread.start(Queue.new) do |subscription_queue|
          processing_queue << ['subscribe', topic, subscription_queue]
          while message = subscription_queue.pop
            writer_queue << message
          end
        end
        writer_queue << 'OK'
      when 'publish'
        topic = args.shift
        message = args.join(' ')
        processing_queue << ['publish', topic, message]
        writer_queue << 'OK'
      end
    end
    client.close
  end
end

socket.close
Nick Hyland
  • 357
  • 1
  • 10
  • "Are TCPSockets in ruby thread safe?" Sharing them between threads? Not a great idea. Normally you accept on one thread and hand off connection to another thread or, as is more common, create a thread per connection. – tadman Nov 07 '22 at 18:44
  • Sure but what to do when implementing full duplex connection? I can have one connection per thread which is blocking awaiting new messages, but I also need to write on this socket based on published messages from another thread. I could send these messages on a queue to the connections thread, but then I'm blocking waiting on queue messages and no longer awaiting new connection messages. Am I missing a simple pattern to implement this? My reasoning above is using one thread to do all the writing to the socket to prevent two writes at same time – Nick Hyland Nov 07 '22 at 18:52
  • If you need to coordinate with some kind of central server thread, it's probably better to start thinking in terms of message passing. This can be tricky in and of itself, which is why tools like [Async](https://github.com/socketry/async) exist to help juggle multiple tasks within a single thread. – tadman Nov 07 '22 at 19:17
  • Yeah this is the thing though, I am passing messages, which is why I'm using queues to communicate between threads. I still though don't see a way to be able to limit the connection to one thread. I still seem to need at least one for reading and one for writing to the same socket. I'm avoiding using tools as I'm trying to demonstrate the difficulties in building something like this from scratch – Nick Hyland Nov 07 '22 at 20:19
  • You'll need to dig into non-blocking more in order to make this work. Your worker thread will need to keep an eye on incoming messages, as well as incoming network activity for the connection it manages, and the main thread will need to coordinate the connection acceptance and delegation, as well as receive/send messages from the queues to the workers. – tadman Nov 07 '22 at 21:35
  • Yes I could use non blocking methods to read and peek, but I’ll then either use up cpu, or have to use sleep for arbitrary amounts of time. This code does work by the way. I’m just trying to find a way to get it to work using standard practices. If you think this problem is interesting can you please give it a vote – Nick Hyland Nov 08 '22 at 08:20
  • You don't necessarily need to use an arbitrary sleep. The way Async works internally is building on things like [`nio4r`](https://github.com/socketry/nio4r) which allow a more flexible notification system for things like network activity. If you want to approach this from first principles, sure, but you just can't solve this elegantly without using something like [`epoll`](https://man7.org/linux/man-pages/man7/epoll.7.html) or [`kqueue`](https://en.wikipedia.org/wiki/Kqueue) in the background. – tadman Nov 08 '22 at 20:03
  • You can do primitive things like have your threads sending signals back and forth, but it gets out of hand really quickly. Historically software was written as a single thread, but with lots of non-blocking calls and tools like [`select`](https://rubyapi.org/3.1/o/io#method-c-select) to wake up when activity occurs. There's a ton of simple C code that uses this approach, no threads required. – tadman Nov 08 '22 at 20:05
  • Yeah I figured a single threaded approach would eventually be much easier given I could implement in nodejs without dealing with all this headache. I have an implementation as a single thread which I took inspiration from [here](https://gist.github.com/sandro/1192557/1b04b0b71b0dc1e5d61e38519a8bdc57a79f535b). I was hoping though to get a more complete solution using the multi threaded approach without using 3rd party libs. The purpose of the exercise was to demonstrate the difficulties in implementing such systems. I think I have demonstrated that :) – Nick Hyland Nov 09 '22 at 09:03
  • Node.js has the advantage of being deeply async, but it's also single-threaded (excluding the seldom used WebWorkers system). I'd say Ruby + Async is the closest thing you can get to the Node experience, or better since you can still use threads. – tadman Nov 09 '22 at 20:04

1 Answers1

1

So to avoid the whole headache of worrying about accessing shared resources with multiple threads, I have created a single threaded approach based on what this great simple example.

This solution can be improved and should really use non blocking read and writes to the sockets, instead of using puts and gets, but I am just implementing a something basic for demonstration purposes.

I would still however like to hear more about the multithreading approach as I feel there has to be a solution out there that people have used which is more complete. If anyone has suggestions please answer.

require 'socket'

server = TCPServer.open("0.0.0.0", 4242)
read_fds = [server]
subscriptions = Hash.new([])
while true
  puts 'loop'
  if ios = select(read_fds, [], [])
    selected_reads = ios.first
    p selected_reads
    selected_reads.each do |client|
      if client == server
        puts 'Someone connected to server. Adding socket to read_fds.'
        client, sockaddr = server.accept
        read_fds << client
      elsif client.eof?
        puts "Client disconnected"
        read_fds.delete(client)
        client.close
      else
        # Perform a blocking-read until new-line is encountered.
        # We know the client is writing, so as long as it adheres to the
        # new-line protocol, we shouldn't block for long
        puts "Reading..."
        request = client.gets
        action, topic, *args = request.split(' ')
        if action == 'subscribe'
          subscriptions[topic] << client
          client.puts('OK')
        elsif action == 'publish'
          message = args.join(' ')
          # Should also be writing to socket in non blocking way using select
          # but again keeping simple assuming all messages to write are small
          subscriptions[topic].each { |client| client.puts(message) }
          client.puts('OK')
        else
          puts "Invalid request #{action}"
          client.puts('ERROR')
        end
      end
    end
  end
end
Nick Hyland
  • 357
  • 1
  • 10