I'm having the same issue as the poster of Accessing a variable within a rails thread but I find that the answer does not provide an actual answer but rather details of how to test it.
I'm following the same Heroku article at https://devcenter.heroku.com/articles/ruby-websockets
I'm building a feature into our webapp app that will notify client browsers when background processes complete.
The basic flow is that a user will initiate a long running process by submitting to a REST service. This service will spin of a Resque background process and send a response back to the client app that the process has started. When the background process has started, the server will send a websocket to the client to notify the user that it has completed.
What I have in place is a Rack middleware class that will listen for websocket connections and add that connection to a instance variable array. If it is not a websocket connection, then the call in continued down the chain.
The middleware class also subscribes to a redis pub/sub event so that it can be notified when background processes complete. The clients in the instance variable array should then be iterated over in the subscribe block to send a websocket message to the client. However, as pointed out in the other post, the client array is empty when in the subscribe block
I've already taken steps to make sure it is thread safe by wrapping access to the instance variable with a mutex
I can see that the clients array is being incremented when a browser connects and I can see it disconnects properly. Additionally, I can see that my pub / sub code is working fine
I've gone through the log messages and I don't see anything wrong as in multiple instances being created or connections being deleted.
Some investigation / discussions have indicated that the variable will not be available in the thread but I don't believe that is true. We are not forking the code here which would result in a new memory space.
any idea whey the instance variable array is empty?
The code is as follows:
require 'faye/websocket'
module FitmoWebSockets
class ActivitiesNotifier
KEEPALIVE_TIME = 15 # in seconds
REDIS_CHANNEL = "worker-job"
def initialize(app)
puts "initializing FitmoWebSockets"
@app = app
@clientMgr = WebsocketClientManager.new
uri = URI.parse(ENV["REDIS_URL"])
@redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(REDIS_CHANNEL) do |on|
on.message do |channel, msg|
clients = @clientMgr.list_clients
puts "redis message received [channel: #{channel}][msg: #{msg}][clients(#{clients.class.name} #{clients.object_id}): #{clients.count}]"
clients.each {|ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
p [:open, ws.object_id]
@clientMgr.add(ws)
end
ws.on :message do |event|
p [:message, event.data]
#@redis.publish(CHANNEL, sanitize(event.data))
end
ws.on :close do |event|
p [:close, ws.object_id, event.code, event.reason]
@clientMgr.remove(ws)
ws = nil
end
# Return async Rack response
ws.rack_response
else
@app.call(env)
end
end
end
class WebsocketClientManager
def initialize
puts "initializing WebsocketClientManager"
@lock = Mutex.new
@clients = []
end
def add(ws)
@lock.synchronize {
@clients.push(ws)
puts "new client added [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
}
end
def remove(ws)
@lock.synchronize {
@clients.delete(ws)
puts "client removed [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
}
end
def list_clients
@lock.synchronize {
puts "listing clients [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
@clients
}
end
end
end
The publish call that I'm calling in the the worker process is as follows
def self.perform(trainer_id, parent_activity_id, child_data)
puts "CreateUpdateSubActivityJob: performing task"
rescue => e
trainer = User.find_by_id(trainer_id)
notify_trainer(trainer, parent_activity_id, child_data)
raise
ensure
puts "CreateUpdateSubActivityJob: publishing redis event"
data = { "user" => trainer_id, "job" => "CreateUpdateSubActivityJob" }
$redis.publish 'worker-job', data.to_json
end