I'm trying to get a better understanding of core.async and channels and such.
My task at hand is to issue a jdbc select statement on a database and stream the results onto an async channel.
I want to have a mutually exclusive thread take from this channel and write to a csv file using clojure.data.csv
.
When running the program below, it appears it's not happening lazily... I'm getting no output to the terminal and then everything appears at once, and my csv file has 50 rows. I hoping someone can help me understand as to why.
Thanks in advance,
(ns db-async-test.core-test
(:require [clojure.java.jdbc :as j]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as async :refer [>! <! >!! <!! chan thread]]
[clojure.string :as str]
[while-let.core :refer [while-let]]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql select, and db-spec connection info, put db
hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
populate channel lazily as a consumer does downstream processing."
(println "starting fetch...")
(let [
row-count (atom 0) ; For state on rows
db-connection (j/get-connection db-spec)
statement (j/prepare-statement
db-connection
sql {
:result-type :forward-only ;; you need this to be lazy
:fetch-size 3 ;; also this
:max-rows 0
:concurrency :read-only})
row-fn (fn[d] (do
(>!! ch d)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 5))
(do
#_(Thread/sleep 2000 )
(println "\tFetched " @row-count " rows.")
(flush)
))))]
(thread
(j/query db-connection [statement]
{:as-arrays? false
:result-set-fn vec
:row-fn row-fn
})
;; as producer we finished popluting the chan, now close in this same
;; thread.
(println "producer closing channel... (hopefully you have written rows by now...")
(async/close! ch))))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file in a separate thread."
(thread
(println "starting csv write...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 2))
(do
#_(Thread/sleep 2000 )
(println "Wrote " @row-count " rows.")
(.flush writer)
(flush)))
))))
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})
;; main sorta thing
(do
(def ch (chan 1))
(db->chan ch config)
;; could pipeline with transducers/etc at some point.
(chan->csv ch "./test.csv"))
Heres some output with my comments explaining when/how it comes out:
db-async-test.core-test>
;; this happens pretty quick when i run:
starting fetch...
starting csv write...
;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote 2 rows.
Fetched 5 rows.
Wrote 4 rows.
Wrote 6 rows.
Wrote 8 rows.
...clip...
Wrote 44 rows.
Wrote 46 rows.
Wrote 48 rows.
Fetched 50 rows.
producer closing channel... (hopefully you have written rows by now...
Wrote 50 rows.