1

I have the following code to monitoring database table to create MQ messsage every several seconds. And I found that when I restarted the RabbitMQ server the app still runs without throwing an exception, and it still print message was created. So why it won't throw an exception? And another question is how to close the connect when I kill the app? Since it's a service, I have no place to write the code to close the RabbitMQ connection.

(require '[clojure.java.jdbc :as j])
(require '[langohr.core      :as rmq])
(require '[langohr.channel   :as lch])
(require '[langohr.queue     :as lq])
(require '[langohr.exchange  :as le])
(require '[langohr.consumers :as lc])
(require '[langohr.basic     :as lb])
(require '[clojure.data.json :as json])
(require '[clojure.java.io   :as io])

(defn load-props
  [file-name]
  (with-open [^java.io.Reader reader (io/reader file-name)]
    (let [props (java.util.Properties.)]
      (.load props reader)
      (into {} (for [[k v] props] [(keyword k) (read-string v)])))))

(def ^{:const true}
  default-exchange-name "")

(defn create-message-from-database
  ([channel qname db-spec]
   (let [select-sql "select a.file_id from file_store_metadata 
                     where processed=false "
         results (j/query db-spec select-sql)
         ]
     (doseq [row results]
       (let [file-id (:file_id row)]
         (lb/publish channel default-exchange-name qname (json/write-str row) {:content-type "text/plain" :type "greetings.hi" :persistent true})
         (j/update! db-spec :file_store_metadata {:processed true} ["file_id = ?" (:file_id row)])
         (println "message created for a new file id" (:file_id row))))
     ))
  )

(defn create-message-from-database-loop
  ([x channel qname db] (
                         while true
                          (Thread/sleep (* x 1000))
                          (create-message-from-database channel qname db)
                          ))
  )

(defn -main
  [& args]
  (let [
        properties (load-props "resource.properties")
        postgres-db (:database properties)
        rabbitmq-url (:rabbitmq properties)
        wake-up-interval (:interval properties)
        rmq-conn  (rmq/connect {:uri rabbitmq-url})
        ch    (lch/open rmq-conn)
        qname "etl scheduler"]
    (println "monitoring file store meta data on " postgres-db " every " wake-up-interval " seconds")
    (println "message will be created on rabbitmq server" rabbitmq-url)
    (lq/declare ch qname {:exclusive false :auto-delete false :persistent true})
    (create-message-from-database-loop wake-up-interval ch qname postgres-db)
    )
  )
Rodrigo Taboada
  • 2,727
  • 4
  • 24
  • 27
Daniel Wu
  • 5,853
  • 12
  • 42
  • 93
  • Hello @Daniel, why are you using two indentations styles on the question? Most people that uses Clojure prefer the style in the `load-props` function. I would recommend that for every project you pick one style and use it consistently. And I think you should be consistent in this question too. – Rodrigo Taboada Feb 04 '15 at 00:13
  • Do you have automatic topology recovery on? On your connection what does this kick out: (let [c (rmq/connect)] (rmq/automatic-recovery-enabled? c)) – Virmundi Feb 11 '15 at 14:08

0 Answers0