I have the following code to monitoring database table to create MQ messsage every several seconds. And I found that when I restarted the RabbitMQ server the app still runs without throwing an exception, and it still print message was created. So why it won't throw an exception? And another question is how to close the connect when I kill the app? Since it's a service, I have no place to write the code to close the RabbitMQ connection.
(require '[clojure.java.jdbc :as j])
(require '[langohr.core :as rmq])
(require '[langohr.channel :as lch])
(require '[langohr.queue :as lq])
(require '[langohr.exchange :as le])
(require '[langohr.consumers :as lc])
(require '[langohr.basic :as lb])
(require '[clojure.data.json :as json])
(require '[clojure.java.io :as io])
(defn load-props
[file-name]
(with-open [^java.io.Reader reader (io/reader file-name)]
(let [props (java.util.Properties.)]
(.load props reader)
(into {} (for [[k v] props] [(keyword k) (read-string v)])))))
(def ^{:const true}
default-exchange-name "")
(defn create-message-from-database
([channel qname db-spec]
(let [select-sql "select a.file_id from file_store_metadata
where processed=false "
results (j/query db-spec select-sql)
]
(doseq [row results]
(let [file-id (:file_id row)]
(lb/publish channel default-exchange-name qname (json/write-str row) {:content-type "text/plain" :type "greetings.hi" :persistent true})
(j/update! db-spec :file_store_metadata {:processed true} ["file_id = ?" (:file_id row)])
(println "message created for a new file id" (:file_id row))))
))
)
(defn create-message-from-database-loop
([x channel qname db] (
while true
(Thread/sleep (* x 1000))
(create-message-from-database channel qname db)
))
)
(defn -main
[& args]
(let [
properties (load-props "resource.properties")
postgres-db (:database properties)
rabbitmq-url (:rabbitmq properties)
wake-up-interval (:interval properties)
rmq-conn (rmq/connect {:uri rabbitmq-url})
ch (lch/open rmq-conn)
qname "etl scheduler"]
(println "monitoring file store meta data on " postgres-db " every " wake-up-interval " seconds")
(println "message will be created on rabbitmq server" rabbitmq-url)
(lq/declare ch qname {:exclusive false :auto-delete false :persistent true})
(create-message-from-database-loop wake-up-interval ch qname postgres-db)
)
)