1

I'm trying to get a better understanding of core.async and channels and such.

My task at hand is to issue a jdbc select statement on a database and stream the results onto an async channel.

I want to have a mutually exclusive thread take from this channel and write to a csv file using clojure.data.csv.

When running the program below, it appears it's not happening lazily... I'm getting no output to the terminal and then everything appears at once, and my csv file has 50 rows. I hoping someone can help me understand as to why.

Thanks in advance,

(ns db-async-test.core-test
  (:require [clojure.java.jdbc  :as j]
            [clojure.java.io    :as io]
            [clojure.data.csv   :as csv]
            [clojure.core.async :as async :refer [>! <! >!! <!!  chan thread]]
            [clojure.string     :as str]
            [while-let.core     :refer [while-let]]))


(defn db->chan [ch {:keys [sql db-spec]} ]
  "Given input channel ch, sql select, and db-spec connection info, put db
  hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
  populate channel lazily as a consumer does downstream processing."
  (println "starting fetch...")
  (let [
        row-count           (atom 0)  ; For state on rows
        db-connection       (j/get-connection db-spec)
        statement (j/prepare-statement
                   db-connection
                   sql {
                        :result-type :forward-only  ;; you need this to be lazy
                        :fetch-size 3               ;; also this
                        :max-rows   0
                        :concurrency :read-only})
        row-fn (fn[d] (do
                       (>!! ch d)
                       ;; everything below is just for printing to stdout and
                       ;; trying to understand where my non-lazy bottleneck is.
                       (swap! row-count inc)
                       (when (zero? (mod @row-count 5))
                         (do
                           #_(Thread/sleep 2000 )
                           (println "\tFetched " @row-count " rows.")
                           (flush)
                           ))))]
    (thread
      (j/query db-connection [statement]
               {:as-arrays?    false
                :result-set-fn vec
                :row-fn row-fn
                })
      ;; as producer we finished popluting the chan, now close in this same
      ;; thread.
      (println "producer closing channel... (hopefully you have written rows by now...")
      (async/close! ch))))


(defn chan->csv [ch csv-file ]
  "With input channel ch and output file csv-file, read values off ch and write
  to csv file in a separate thread."
  (thread
    (println "starting csv write...")
    (def row-count (atom 0))
    (with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
      (while-let [data (<!! ch)]
        (swap! row-count inc)
        (csv/write-csv writer [data] :quote? (fn[x] false) )
        (when (zero? (mod @row-count 2))
          (do
            #_(Thread/sleep 2000 )
            (println "Wrote " @row-count " rows.")
            (.flush writer)
            (flush)))
        ))))

(def config {:db-spec {:classname "org.postgres.Driver"
                       :subprotocol "postgres"
                       :subname "//my-database-host:5432/mydb"
                       :user "me"
                       :password "****"}
             :sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})

;; main sorta thing
(do
  (def ch (chan 1))
  (db->chan ch config)
  ;; could pipeline with transducers/etc at some point.
  (chan->csv ch "./test.csv"))

Heres some output with my comments explaining when/how it comes out:

db-async-test.core-test>
;; this happens pretty quick when i run:
starting fetch...
starting csv write...

;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote  2  rows.
    Fetched  5  rows.
Wrote  4  rows.
Wrote  6  rows.
Wrote  8  rows.
...clip...
Wrote  44  rows.
Wrote  46  rows.
Wrote  48  rows.
    Fetched  50  rows.
producer closing channel... (hopefully you have written rows by now...
Wrote  50  rows.
joefromct
  • 1,506
  • 13
  • 33
  • what `while-let` does? please share the code – mavbozo Nov 10 '17 at 09:01
  • I pulled it in from my project.clj: https://github.com/markmandel/while-let – joefromct Nov 10 '17 at 15:50
  • Might be jdbc weirdness with that db/driver specifically. Have had issues in the past where `clojure.java.jdbc` wouldn't have expected behaviour because of db differences. Would have to drop down a level to better configure the jdbc stuff. See: https://stackoverflow.com/a/39775018/4351017 for what seems to be the related postgreSQL issue. – alvinfrancis Nov 13 '17 at 15:14
  • Thanks for the response. I think based on your posted link the only solution would be to put all functionality into the `row-fn`... As i'm following all the other rules... and `vec` (my `:result-set-fn`) should be lazy. – joefromct Nov 13 '17 at 18:36
  • as i've been tinkering with this i've come across [this](http://funcool.github.io/clojure.jdbc/latest/api/jdbc.resultset.html#var-result-set-.3Elazyseq) which seems to be exactly what i need... if i get it to work i'll post a solution. I need to understand the trade offs of going with `funcool/clojure.jdbc` vs `clojure.java.jdbc`... – joefromct Nov 13 '17 at 18:36

1 Answers1

1

Ok, I think I have something that works for me.

The main fix I've had was to swap out org.clojure.java/jdbc and replaced it with funcool/clojure.jdbc in my project.clj.

What funcool/clojure.jdbc gives me is access to result-set->lazy-seq.

New ns :

(ns db-async-test.core-test
  (:require [jdbc.core :as j]
            [while-let.core :refer [while-let]]
            [clojure.java.io :as io]
            [clojure.data.csv :as csv]
            [clojure.core.async :as a :refer [>!! <!! chan thread]]
            [clojure.string :as str]))

Below is the the relevent code. Based on what I can tell this streams things through with async channels. I should be able to play around with async reducers/transducers using this, and hopefully over pretty large volumes (especially with map like functions) If I'm patient.

Function for reader thread:

(defn db->chan [ch {:keys [sql db-spec]} ]
  "Put db hash-maps onto ch."
  (println "starting reader thread...")
  (let [
        row-count           (atom 0)  ; For state on rows
        row-fn (fn[r] (do (>!! ch r)
                         ;; everything below is just for printing to stdout
                         (swap! row-count inc)
                         (when (zero? (mod @row-count 100))
                           (println "Fetched " @row-count " rows."))))]
    (with-open [conn (j/connection db-spec)]
      (j/atomic conn
                (with-open [cursor (j/fetch-lazy conn sql)]
                  (doseq [row (j/cursor->lazyseq cursor)]
                    (row-fn row)))))
      (a/close! ch)))

Function for writer thread:

(defn chan->csv [ch csv-file ]
  "Read values off ch and write to csv file."
  (println "starting writer thread...")
  (def row-count (atom 0))
  (with-open [^java.io.Writer writer (io/writer csv-file 
                                      :append false :encoding "UTF-8")]
    (while-let [data (<!! ch)]
      (swap! row-count inc)
      (csv/write-csv writer [data] :quote? (fn[x] false) )
      (when (zero? (mod @row-count 100))
        (println "Wrote " @row-count " rows.")))))

I put the thread parts in below rather than in the individual functions:

(def config {:db-spec {:subprotocol "postgresql"
                       :subname "//mydbhost:5432/mydb"
                       :user "me"
                       :password "*****"}
             :sql "select row_id, giant_xml_value::text from some_table"})

(do
  (def ch (chan 1))
  (thread (db->chan ch config))
  (thread (chan->csv ch "./test.csv")))

Output below, looks like both threads are working at the same time, streaming data out onto the channel, and popping from that channel to the csv.
Also, even with giant_xml_column my system still isn't using a huge amount of memory.

starting fetch...
starting csv write...
Fetched  100 Wrote  rows.
100
  rows.
Fetched  200Wrote    rows.200

 rows.
Fetched  300  rows.
Wrote
...clip....
6000  rows.
Fetched  6100  rows.
Wrote  6100  rows.
Fetched  6200  rows.
Wrote  6200  rows.
Fetched  6300  rows.
Wrote
 6300  rows.
Fetched  6400Wrote    rows.6400

 rows.
Fetched  6500  rows.Wrote

6500  rows.
joefromct
  • 1,506
  • 13
  • 33