8

I'm building a small ruby program to run a connection to a MQTT server and subscribe to a channel. I'm using the mosquitto gem which is just a bridge for libmosquitto C library.

I created a very simple implementation of a program that can run with ruby my_prog.rb:

# Dependencies

require File.expand_path(File.join('..', 'environment'), __FILE__)


# MQTT Application

module Pulsr
    class MQTT
        attr_reader :host, :port, :alive

        def initialize(host = 'iot.eclipse.org', port = 1883, alive = 60)
            @client ||=  Mosquitto::Client.new SecureRandom.hex(8)

            Signal.trap(Signal.list.has_key?('INT') ? 'SIGINT' : 'SIGTERM') do
            @client.log 'Shutdown'
            shutdown
            end

            @host = host
            @port = port
            @alive = alive

            start
        end


        private

        def on_connect
            Proc.new { |return_code|
                @client.log "Connected RC #{return_code}"

                @client.subscribe(nil, '/pulsr', Mosquitto::EXACTLY_ONCE)
            }
        end

        def on_disconnect
            Proc.new { |return_code| @client.log "Disconnected RC #{return_code}" }
        end

        def on_subscribe
            Proc.new { |message_id, granted_qos| @client.log "Subscribed MID #{message_id} QoS #{granted_qos}" }
        end

        def on_unsubscribe
            Proc.new { |message_id| @client.log "Unsubscribed MID #{message_id}" }
        end

        def on_message
            Proc.new { |message| Pulsr::Workers::TrackingEvent.perform_async message.to_s }
        end

        def configure
            @client.logger = Logger.new(STDOUT)

            @client.on_connect &on_connect
            @client.on_disconnect &on_disconnect
            @client.on_subscribe &on_subscribe
            @client.on_unsubscribe &on_unsubscribe
            @client.on_message &on_message
        end

        def connect
            @client.connect_async(@host, @port, @alive)
        end

        def start
            @client.loop_start

            configure
            connect

            sleep
        end

        def shutdown
            @client.loop_stop(true)
            Process.exit
        end
    end
end


# MQTT Start

Pulsr::MQTT.new :host => 'iot.eclipse.org', :port => 1883, :alive => 60

I was wondering, if I wanted to use Celluloid or EventMachine to run the loops that the mosquitto gem provides, how would I do it?

The mosquitto gem provides a good documentation and presents a few loop methods that can be used, but I have no clue where to start or how to do it, neither I have ever used EM or Celluloid.

Could anyone help get started with this, I think it could bring some value to the community and it can end up as a open source project, a small addition to the mosquitto gem?

Roland
  • 9,321
  • 17
  • 79
  • 135
  • So without any prior experience with EventMachine or MQTT, here are some thoughts. No matter what you'll need some process to run in the foreground effectively waiting for messages from your server. I think choosing something like EventMachine depends on whether you can parallelize your message consumption from the topic multiple threads. If you can't, then EventMachine won't do much for you. If you can, you'll probably need to find/or implement a client similar to the AMQP adapter Gem for EventMachine otherwise you'll block I/O: https://github.com/ruby-amqp/amqp. Sounds like fun, though! – jaysqrd Jul 01 '14 at 05:15

2 Answers2

1

I think it is not that hard. Mosquitto has a good library.

Yo need to connect these functions:

mosquitto_loop_misc() <-> EventMachine::PeriodicTimer.new
mosquitto_read() <-> EventMachine.watch
mosquitto_write() <-> EventMachine.watch
jsaak
  • 587
  • 4
  • 17
  • There is a twist in mosquitto_write() you either try to write periodically, or there is a dedicated socket which you can watch, mosquitto will send a byte on it when it has something to write(). However this socket is not exposed to external functions. – jsaak Feb 16 '16 at 08:24
0

The em-mqtt gem provides an MQTT protocol implementation for eventmachine.
This uses the pure ruby mqtt implementation to process the messages rather than libmosquitto.

If you really have to use the libmosquitto implementation for the parsing via the mosquitto gem then the above delineation would hold. The eventmachine component will be pretty much as is. All the calls to the protocol specific MQTT module would be replaced with the equivalent in libmosquitto. The main problem looks to be that the libmosquitto public API and subsequent Ruby API hides all of this away, down in libmosquitto's own network implementation, which is being replaced with eventmachine, so you would have a lot of hacking to expose the required methods to Ruby before you can get started.

Matt
  • 68,711
  • 7
  • 155
  • 158
  • While you answer makes sense, it does not provide any example of how I could implement what I'm asking in the post. – Roland Aug 26 '14 at 13:36