I am currently implementing a simplistic pub/sub system in ruby using only the standard lib, where a client can subscribe to a topic for messages or publish messages to a topic, which will be sent to all subscribers. I'm using just standard TCPServer and assuming all messages are on a single line read using gets
. I'm using ruby Queue
s to communicate between threads and avoid accessing shared memory resources unless they are thread safe. I have reached a point where I can't see any other way to continue other than share my TCPSocket
client between threads as one thread needs to block in a loop while awaiting new data on the socket, while the other needs to await new messages published and write them to the client. Are TCPSocket
s in ruby thread safe? If not would a simple dup
or clone
call suffice and have each thread with it's own reference to the socket?
For code reference I have the following basic implementation
require 'socket'
socket = TCPServer.new(4242)
processing_queue = Queue.new
Thread.start({}) do |subscriptions|
while event = processing_queue.pop
command, *args = event
case command
when 'publish'
topic, message = args
subscriptions[topic].each do |subscription_queue|
subscription_queue.push(message)
end
when 'subscribe'
topic, subscription_queue = args
subscriptions[topic] = [] if subscriptions[topic].nil?
subscriptions[topic] << subscription_queue
end
end
end
loop do
Thread.start(socket.accept, Queue.new) do |client, queue|
writer_queue = Queue.new
Thread.start do
while response = writer_queue.pop
client.puts(response)
end
end
while request = client.gets
command, *args = request.split(' ')
case command
when 'subscribe'
topic = args[0]
Thread.start(Queue.new) do |subscription_queue|
processing_queue << ['subscribe', topic, subscription_queue]
while message = subscription_queue.pop
writer_queue << message
end
end
writer_queue << 'OK'
when 'publish'
topic = args.shift
message = args.join(' ')
processing_queue << ['publish', topic, message]
writer_queue << 'OK'
end
end
client.close
end
end
socket.close