6

The number of possible executions of a function should be throttled. So after calling a function, any repeated call should be ignored within a time period. If there where calls in the meantime, the last one should be executed after the time period.

Here's my approach with core.async. The problem here is, that additional calls are summing up in the channel c. I'd need a channel with only one position inside, which will be overridden by put! everytime.

(defn throttle [f time]
  (let [c (chan 1)]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (if args args [])))))

usage:

(def throttled (throttle #(print %) 4000))
(doseq [x (range 10)]
   (throttled x))

; 0
;... after 4 seconds
; 9

Does anyone have an idea how to fix this?

Solution

(defn throttle [f time]
  (let [c (chan (sliding-buffer 1))]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (or args [])))))
Anton Harald
  • 5,772
  • 4
  • 27
  • 61
  • https://github.com/swannodette/swannodette.github.com/blob/e8a26994fcf4e2bdf0cca67e165f5e1706fc192b/code/blog/src/blog/utils/reactive.cljs#L239 – ClojureMostly Feb 27 '16 at 06:20
  • See also: http://aleph.io/codox/manifold/manifold.stream.html#var-throttle – muhuk Feb 27 '16 at 06:54

4 Answers4

3

To solve your channel question you can use a chan with a sliding buffer:

user> (require '[clojure.core.async :as async])
nil
user> (def c (async/chan (async/sliding-buffer 1)))
#'user/c
user> (async/>!! c 1)
true
user> (async/>!! c 2)
true
user> (async/>!! c 3)
true
user> (async/<!! c)
3

that way only the last value put into the channel will be computed at the next interval.

Arthur Ulfeldt
  • 90,827
  • 27
  • 201
  • 284
  • IMO that's not what throttling is, if producer and consumer are both fast there is no discarding of messages. – ClojureMostly Feb 27 '16 at 06:22
  • The answer is not complete, but it's not incorrect either. A combination of some consumer with a timeout **and** using a `sliding-buffer` would make a complete answer. – muhuk Feb 27 '16 at 06:56
2

You can use a debounce function.

I'll copy it out here:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil]
      (let [val (if (nil? last-val) (<! in) last-val)
            timer (timeout ms)
            [new-val ch] (alts! [in timer])]
        (condp = ch
          timer (do (>! out val) (recur nil))
          in (recur new-val))))
    out))

Here only when in has not emitted a message for ms is the last value it emitted forwarded onto the out channel. While in continues to emit without a long enough pause between emits then all-but-the-last-message are continuously discarded.

I've tested this function. It waits 4 seconds and then prints out 9, which is nearly what you asked for - some tweaking required!

(defn my-sender [to-chan values]
  (go-loop [[x & xs] values]
           (>! to-chan x)
           (when (seq xs) (recur xs))))

(defn my-receiver [from-chan f]
  (go-loop []
           (let [res (<! from-chan)]
             (f res)
             (recur))))

(defn setup-and-go []
  (let [in (chan)
        ch (debounce in 4000)
        sender (my-sender in (range 10))
        receiver (my-receiver ch #(log %))])) 

And this is the version of debounce that will output as required by the question, which is 0 immediately, then wait four seconds, then 9:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil
              first-time true]
             (let [val (if (nil? last-val) (<! in) last-val)
                   timer (timeout (if first-time 0 ms))
                   [new-val ch] (alts! [in timer])]
               (condp = ch
                 timer (do (>! out val) (recur nil false))
                 in (recur new-val false))))
    out)) 

I've used log rather than print as you did. You can't rely on ordinary println/print functions with core.async. See here for an explanation.

Chris Murphy
  • 6,411
  • 1
  • 24
  • 42
  • looks like a more complex construction, which I cannot immediately grasp. Doesn't my approach make sense in a way, except for the already mentioned misbehavior? I mean, couldn't this be easily fixed somehow? – Anton Harald Feb 26 '16 at 23:36
1

This is taken from David Nolens blog's source code:

(defn throttle*
  ([in msecs]
    (throttle* in msecs (chan)))
  ([in msecs out]
    (throttle* in msecs out (chan)))
  ([in msecs out control]
    (go
      (loop [state ::init last nil cs [in control]]
        (let [[_ _ sync] cs]
          (let [[v sc] (alts! cs)]
            (condp = sc
              in (condp = state
                   ::init (do (>! out v)
                            (>! out [::throttle v])
                            (recur ::throttling last
                              (conj cs (timeout msecs))))
                   ::throttling (do (>! out v)
                                  (recur state v cs)))
              sync (if last 
                     (do (>! out [::throttle last])
                       (recur state nil
                         (conj (pop cs) (timeout msecs))))
                     (recur ::init last (pop cs)))
              control (recur ::init nil
                        (if (= (count cs) 3)
                          (pop cs)
                          cs)))))))
    out))

(defn throttle-msg? [x]
  (and (vector? x)
       (= (first x) ::throttle)))

(defn throttle
  ([in msecs] (throttle in msecs (chan)))
  ([in msecs out]
    (->> (throttle* in msecs out)
      (filter #(and (vector? %) (= (first %) ::throttle)))
      (map second))))

You probably also want to add a dedupe transducer to the channel.

ClojureMostly
  • 4,652
  • 2
  • 22
  • 24
  • Thanks! A second exemplary code sample showing a practical application would make your answer tops :) Would also like to see the dedupe in action – Petrus Theron Mar 18 '18 at 10:00
0

I needed to pass a function to capture the args because I was using it for an input event and it was passing a mutable object.

(defn throttle-for-mutable-args [time f arg-capture-fn]
  (let [c (async/chan (async/sliding-buffer 1))]
    (async-m/go-loop []
      (f (async/<! c))
      (async/<! (async/timeout time))
      (recur))
    (fn [& args]
      (async/put! c (apply arg-capture-fn (or args []))))))

And I use like

[:input
  {:onChange (util/throttle-for-mutable-args                                      
               500
               #(really-use-arg %)                                 
               #(-> % .-target .-value))}]
Jp_
  • 5,973
  • 4
  • 25
  • 36