0

Context: Clojure + RabbitMQ (via Langohr), a follow up to this question.

I'm getting weird results in consuming messages from a RabbitMQ mq (getting the messages from a direct exchange and publishing to a fanout exchange after processing the message). I don't understand why messages end up in separate threads during consumption (every few messages a thread switch happens).

The consumer starts in a separate thread (to prevent it from crashing the main thread if any IO exceptions happen), but that doesn't explain the switching.

; Message handler
(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        content (string/join " " (map msg '("title" "link" "body")))
        tags (pluck-tags content)]
    (println (format "HANDLER %s: Message: %s | found tags: %s"
                     (Thread/currentThread)
                     (msg "title")
                     (tags-to-csv tags)))
  (nil)))
  ; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))


(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))

The message-handler just prints out the current thread and the payload received. This is what I get:

HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...

NOTE

I noticed this during playing with agents. I wanted to process each message in its own CPU bound thread-pool, and publish it in a unbounded (IO) thread-pool. But, after printing out the current thread I noticed that even without using agents (or futures), messages get processed by different threads.

Community
  • 1
  • 1
neektza
  • 483
  • 6
  • 11

2 Answers2

1

1) You have a fanout exchange there, that means the routing key is not used at all while routing messages. A fanout exchange routes messages to every queue bound to it. If you want to use routing keys, then either use direct or topic exchanges.

2) You always use the same queue name, that means what your code is doing is just adding multiple consumers to the same queue. That implies that rabbitmq will just round robin messages around your consumers.

old_sound
  • 2,243
  • 1
  • 13
  • 16
  • The fanout exchange (e-pub-name) is used for publishing the processed messages. The direct exchange (e-sub-name) is used to listen for messages in the preproc queue. So the clojure process is a middle-man. The big picture: **producers** => direct exchange --> preproc queue => the clojure process (or multiple processors, depending on the load) => fanout exchange --> **consumers** – neektza Oct 13 '12 at 21:48
  • The Clojure process is mean to be scalable, so thats why I use a direct exchange to listen - I just need to start up more processess and messages get routed via round-robin. After processing the message, I just publish it to other consumers. – neektza Oct 13 '12 at 21:56
0

Author of Langohr here.

There must be something missing from the code. If you get this output with agents, that's easy: Clojure agents (also, futures and promises) use a thread pool. Langohr's langohr.consumers/subscribe or the underlying QueueingConsumer in the RabbitMQ Java client do not.

Michael Klishin
  • 786
  • 3
  • 6
  • Thanks for the answer. I've edited the question (added the message-handler). So as you can see, no agents there. – neektza Oct 13 '12 at 21:46