1

I'm having the same issue as the poster of Accessing a variable within a rails thread but I find that the answer does not provide an actual answer but rather details of how to test it.

I'm following the same Heroku article at https://devcenter.heroku.com/articles/ruby-websockets

I'm building a feature into our webapp app that will notify client browsers when background processes complete.

The basic flow is that a user will initiate a long running process by submitting to a REST service. This service will spin of a Resque background process and send a response back to the client app that the process has started. When the background process has started, the server will send a websocket to the client to notify the user that it has completed.

What I have in place is a Rack middleware class that will listen for websocket connections and add that connection to a instance variable array. If it is not a websocket connection, then the call in continued down the chain.

The middleware class also subscribes to a redis pub/sub event so that it can be notified when background processes complete. The clients in the instance variable array should then be iterated over in the subscribe block to send a websocket message to the client. However, as pointed out in the other post, the client array is empty when in the subscribe block

I've already taken steps to make sure it is thread safe by wrapping access to the instance variable with a mutex

I can see that the clients array is being incremented when a browser connects and I can see it disconnects properly. Additionally, I can see that my pub / sub code is working fine

I've gone through the log messages and I don't see anything wrong as in multiple instances being created or connections being deleted.

Some investigation / discussions have indicated that the variable will not be available in the thread but I don't believe that is true. We are not forking the code here which would result in a new memory space.

any idea whey the instance variable array is empty?

The code is as follows:

require 'faye/websocket'

module FitmoWebSockets
  class ActivitiesNotifier
    KEEPALIVE_TIME = 15 # in seconds
    REDIS_CHANNEL = "worker-job"

    def initialize(app)
      puts "initializing FitmoWebSockets"
      @app        = app
      @clientMgr  = WebsocketClientManager.new

      uri = URI.parse(ENV["REDIS_URL"])
      @redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      Thread.new do
          redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
          redis_sub.subscribe(REDIS_CHANNEL) do |on|
            on.message do |channel, msg|
              clients = @clientMgr.list_clients
              puts "redis message received [channel: #{channel}][msg: #{msg}][clients(#{clients.class.name} #{clients.object_id}): #{clients.count}]"
              clients.each {|ws| ws.send(msg) }
            end
          end
      end
    end

    def call(env)
      if Faye::WebSocket.websocket?(env)

        ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
        ws.on :open do |event|
          p [:open, ws.object_id]
          @clientMgr.add(ws)
        end

        ws.on :message do |event|
          p [:message, event.data]
          #@redis.publish(CHANNEL, sanitize(event.data))
        end

        ws.on :close do |event|
          p [:close, ws.object_id, event.code, event.reason]
          @clientMgr.remove(ws)
          ws = nil
        end

        # Return async Rack response
        ws.rack_response
      else
        @app.call(env)
      end
    end
  end

  class WebsocketClientManager

    def initialize
      puts "initializing WebsocketClientManager"
      @lock = Mutex.new
      @clients = []
    end

    def add(ws)
      @lock.synchronize {
        @clients.push(ws)
        puts "new client added [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
      }
    end

    def remove(ws)
      @lock.synchronize {
        @clients.delete(ws)
        puts "client removed [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
      }
    end

    def list_clients
      @lock.synchronize {
        puts "listing clients [clients(#{@clients.class.name} #{@clients.object_id}): #{@clients.count}]"
        @clients
      }
    end
  end
end

The publish call that I'm calling in the the worker process is as follows

def self.perform(trainer_id, parent_activity_id, child_data)
  puts "CreateUpdateSubActivityJob: performing task"

rescue => e
  trainer = User.find_by_id(trainer_id)
  notify_trainer(trainer, parent_activity_id, child_data)
  raise

ensure
  puts "CreateUpdateSubActivityJob: publishing redis event"
  data = { "user" => trainer_id, "job" => "CreateUpdateSubActivityJob" }
  $redis.publish 'worker-job', data.to_json
end
Community
  • 1
  • 1
ikbenben
  • 133
  • 2
  • 9
  • (Disclaimer, I'm the author) Did you look into [plezi.io](http://www.plezi.io)? it will automate the websocket collections and the Redis sync. It can also be used as middleware in your application. It requires Iodine, so that websockets are handled by the server (unlike the socket `hijacking` currently employed by most solutions)... – Myst Sep 11 '16 at 06:19
  • Did you consider using `Thread(@clientMgr).new do |cl_manager| #...` to avoid scoping considerations? – Myst Sep 11 '16 at 06:23
  • @Myst I hadn't tried that. You mean to replace the Thread.new do with this. I didn't think of that. I assumed that the instance variable would be accessible in different threads. We have actually switched to Pusher.com instead as it was a more straight forward solution that didn't require threading. I'd like to try this out though when I get time. Will try your solution and see if it makes a difference – ikbenben Sep 11 '16 at 18:41

0 Answers0