6

Scenario: I have a server listening to six active TCP/IP connections. When a "ready" message comes in, an event will be raised on its own thread. When the server has received a "ready" message from each connection, it needs to run the "start" function.

My object oriented solution would likely involve using a mutex and a counter. Something like:

int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
    lock(_lock) {
      --_countDown; // 
       if (_countDown==0) Start();
    }
}

How could this problem be solved in Clojure without resorting to locks/mutexes?

Anonymous
  • 739
  • 7
  • 15

3 Answers3

6

You can use a CountDownLatch or a Phaser for this purpose.

In my futures library, imminent, I used both. CountDownLatch first and then replaced it with a Phaser for ForkJoin compatibility (might not be necessary in your case). You can see the change in this diff. Hopefully it gives you an idea of usage for both.

With latches the general idea would be:

(let [latch (CountDownLatch. 6)]
  (on-receive-message this (fn [_] (.countDown latch)))
  (.await latch)

...or something like that.

leonardoborges
  • 5,579
  • 1
  • 24
  • 26
  • 2
    Exactly this. Using a mutex and a counter is wrong in an imperative world too: find the right concurrency primitives for the job, instead of reimplementing everything on top of mutexes. – amalloy Feb 12 '16 at 00:54
  • Sorry if this is dumb question, but how does Java's "this" keyword work in Clojure? Is there any documentation on this? – Anonymous Feb 17 '16 at 17:46
  • 1
    It's not a dumb question at all. 'this' in Clojure has no special meaning. It's just convention to use 'this' when you're implementing a protocol or an interface. The confusion in the example above is that this is defined outside the let block. That's probably what tripped you. – leonardoborges Feb 17 '16 at 22:33
6

When you prefer a pure clojure version, you can use a promise to give your futures a go.

Every time you receive message you increment the conn-count the watch checks if the treshold is reached and delivers :go to the barrier promise.

(def wait-barrier (promise))
(def conn-count (atom 0))

(add-watch conn-count :barrier-watch
           (fn [key ref old-state new-state]
             (when (== new-state 6)
               (deliver wait-barrier :go))))  

dummy-example:

(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
  (when (= @wait-barrier :go)
    (println "I'm a worker")))

(defn dummy-receive-msg []
  (doall (repeatedly 6,
                     (fn []
                       (println "received msg")
                       (swap! conn-count inc)))))

(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
  (add-watch conn-count :barrier-watch
             (fn [key ref old-state new-state]
               (when (== new-state 6)
                 (deliver wait-barrier :go))))
  (dummy-receive-msg)
  (doall (map deref workers)))
muhuk
  • 15,777
  • 9
  • 59
  • 98
murphy
  • 524
  • 4
  • 16
  • 2
    (str "+" 1) for no interop answer. If just for learning value. BTW you don't need `(not= old-state new-state)` in the second snippet, `deliver` is a noop when it's called on an already realized promise. – muhuk Feb 12 '16 at 08:54
  • you are right, I can't approve due to the lack of privileges tough. – murphy Feb 12 '16 at 09:15
2

Since it hasn't been mentioned so far: you could easily do that with core.async. Have a look at this MCVE:

(let [conn-count 6
      ready-chan (chan)]

  ;; Spawn a thread for each connection.
  (doseq [conn-id (range conn-count)]
    (thread
      (Thread/sleep (rand-int 2000))
      (>!! ready-chan conn-id)))

  ;; Block until all connections are established.
  (doseq [total (range 1 (inc conn-count))]
    (println (<!! ready-chan) "connected," total "overall"))

  ;; Invoke start afterwards.
  (println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil

You could also use a channel to implement a countdown latch (borrowed from Christophe Grand):

(defn count-down-latch-chan [n]
  (chan 1 (comp (drop (dec n)) (take 1))))

For a short introduction into core.async, check out this Gist. For a longer one, read the corresponding chapter in "Clojure for the Brave and True".

Community
  • 1
  • 1
beatngu13
  • 7,201
  • 6
  • 37
  • 66