3

I use some Java library that makes non-async get and post requests. I used to wrap such requests to futures and it solves for me the "waiting problem" (I mean waiting for the response)

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [res (atom [])]
    (dotimes [i n]
      (future (swap! res conj (unchangeable-lib-request i))))
    (loop []
      (if (> n (count @res))
        (recur)
        @res))))

(time (process 9))

;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]

But I need to create hundreds of requests and this creates performance problems. I found out about core.async and go blocks. But if I will use go-blocks with this library, it will not solve the "waiting problem"

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]

Go blocks can handle just 8 requests simultaneously. Is there a possibility to write some async-wrapper that will park go-block and provide ability to make 100s of requests asynchronously without blocking each other?

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (magic-async-parking-wrapper
                      (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 1003.2563 msecs"

I know about async/thread but it seems that this is the same as (future ...).

Is it possible?

Defake
  • 373
  • 4
  • 15
  • Have you read [this](https://stackoverflow.com/questions/46517489/allocate-thread-per-request-clojure/46520878#46520878)? – akond Oct 20 '17 at 13:29
  • What are you pointing to? I didn't find anything useful there. You mean that using (Thread/sleep _) is not a good idea? So, that is the point. I already have a blocking operation and can't change it. And I need to do something with it – Defake Oct 21 '17 at 07:57

2 Answers2

2

I'd suggest:

  • Use futures to create the threads, and have them put the results back onto a core async channel from outside of any go block using put!, something like: (future (put! chan (worker-function)))
  • Then use a go block to wait on that (single) channel, put in the results as you get them.
tarmes
  • 15,366
  • 10
  • 53
  • 87
  • Hm, why `put!` instead of `>!!`? I don't totally understand the difference between the two, but I think the point of `put!` is to allow you to do fancier error handling if the channel has no buffer space. Since you just want to wait for space, I would have expected `>!!`. – amalloy Oct 20 '17 at 16:58
  • What is the difference between my first implementation and your answer? If I want to make 100 requests at the moment, the program will create 100 futures that will wait for their own requests till the response. This is not asynchronous. I don't understand something? – Defake Oct 21 '17 at 08:07
  • @amalloy the difference is if the channel `c` has no space, `>!!` will block a thread and wait for opportunity to put a value into the `c`, but the `put!` will not block the thread and will put a value asynchronously when there's an opportunity. But yes, it seems, in this case there's no difference – Defake Oct 25 '17 at 19:04
  • ˋput!ˋ Can be used from outside of a ˋgoˋ block, whereas the arrow functions can’t. – tarmes Oct 25 '17 at 19:23
  • @tarmes The `>!` and `<!` arrow functions can't be used from outside of a `go` block. But the `>!!` and `<!!` can be, but will block threads – Defake Oct 26 '17 at 03:31
1

This is where you use clojure.core.async/pipeline-blocking

(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])


(let [output-chan (chan 100)
      input-chan (chan 1000)]
  (pipeline-blocking 4 ; parallelism knob
                     output-chan 
                     (map unchangeable-lib-request) 
                     input-chan)
  ;; Consume results from output-chan, put operations on input-chan
  [output-chan input-chan]
  )

This spawns n (in this case 4) threads that are kept busy executing unchangeable-lib-request.

Use the buffer size of output-chan to finetune how much requests you want to happen in advance.

Use the buffer size of input-chan to finetune how many requests you want scheduled without backpropagation (a blocking input-chan).

Leon Grapenthin
  • 9,246
  • 24
  • 37
  • 1
    Yay! Nice solution, thanks! But is there performance difference between this method and using futures? As I understand there will be created N threads (= futures) that will handle blocking operations. It is impossible to wait for blocking requests using just 1-2 threads with parking? – Defake Oct 21 '17 at 16:01
  • @Defake Not sure I understand your question, but you can change `4` to `2` or `1` and then there will only be 2 threads or 1. – Leon Grapenthin Oct 21 '17 at 16:14
  • @Defake What exactly do you intend to do with parking? – Leon Grapenthin Oct 21 '17 at 16:17
  • 1
    You can, if you want, add a backchannel. Put into `input-chan` requests like this `{:request-data "whatever-the-java-thing-needs", :response (promise-chan)}`. Then make `unchangeable-lib-request` put the response on `:response` via `>!!`. Then do `(go (let [resp (promise-chan)] (>! input-chan {:request-data ..., :response resp}) (<! resp))`. This block will be parked until the result is calculated. Make sure to use a `dropping-buffer` for `output-chan`, though. – Leon Grapenthin Oct 21 '17 at 16:22
  • I mean that there's multithreading programming (when a program creates many threads - one for each request) and asynchronous programming (when, as far I understand, program creates one thread and launches requests one by one and they don't block each other and don't need separate threads. For example, [this](https://stackoverflow.com/a/34681101/5008606) post about concurrent programming explains what I am saying. – Defake Oct 21 '17 at 17:14
  • Yes but what do you want to achieve in those terms that my solution doesn't provide? – Leon Grapenthin Oct 21 '17 at 18:34
  • Your solution creates as many threads as many requests are processing (not exactly, but let's say so). My question is: is this solution equal to one-thread asynchronous solutions by performance? I just know that JVM spends some amount of memory and cpu on each existing thread. And therefore, if you create many threads, the program will be lagging. Am I wrong? – Defake Oct 21 '17 at 21:11
  • 1
    The example program runs 4 threads and no more or less. They are shutdown once you close `input-chan`. This is perfectly fine; JVM wise. Problematic is havingn one thread per requests because they will be blocking each other. – Leon Grapenthin Oct 22 '17 at 07:59