Context: Clojure + RabbitMQ (via Langohr), a follow up to this question.
I'm getting weird results in consuming messages from a RabbitMQ mq (getting the messages from a direct exchange and publishing to a fanout exchange after processing the message). I don't understand why messages end up in separate threads during consumption (every few messages a thread switch happens).
The consumer starts in a separate thread (to prevent it from crashing the main thread if any IO exceptions happen), but that doesn't explain the switching.
; Message handler
(defn message-handler
[pub-name ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
content (string/join " " (map msg '("title" "link" "body")))
tags (pluck-tags content)]
(println (format "HANDLER %s: Message: %s | found tags: %s"
(Thread/currentThread)
(msg "title")
(tags-to-csv tags)))
(nil)))
; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
The message-handler just prints out the current thread and the payload received. This is what I get:
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
NOTE
I noticed this during playing with agents. I wanted to process each message in its own CPU bound thread-pool, and publish it in a unbounded (IO) thread-pool. But, after printing out the current thread I noticed that even without using agents (or futures), messages get processed by different threads.