28

I'm trying to figure out the best way to use agents to consume items from a Message Queue (Amazon SQS). Right now I have a function (process-queue-item) that grabs an items from the queue, and processes it.

I want to process these items concurrently, but I can't wrap my head around how to control the agents. Basically I want to keep all of the agents busy as much as possible without pulling to many items from the Queue and developing a backlog (I'll have this running on a couple of machines, so items need to be left in the queue until they are really needed).

Can anyone give me some pointers on improving my implementation?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))
erikcw
  • 10,787
  • 15
  • 58
  • 75
  • Is there some way you could treat the Message Queue as a seq and then just use pmap to get the parallelization? – Alex Stoddard Apr 08 '10 at 21:58
  • @Alex Stoddard: In my case, process-queue-item actually blocks on network IO, so I don't think pmap is the right choice since it only uses as many threads as the machine has cores. – erikcw Apr 08 '10 at 23:01
  • @erikw: Sure, but that is just a pmap implementation detail (threads = #cores + 2). No reason you couldn't write a version of pmap with a parameterized number of threads. See the first line of the pmap source: (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) – Alex Stoddard Apr 09 '10 at 14:29
  • Hi, I have several questions: 1. agents have a value, are you interested in their value or are you just using it as a threadpool? 2. is there a final result to the queue consumption or does process-queue-item perform side-effects? – cgrand Apr 12 '10 at 12:59
  • @cgrand: 1) I'm not interested in the value of the agents, just using them as a threadpool. 2) process-queue-item has seid-effects (pushes results back out to a Message Queue). – erikcw Apr 13 '10 at 02:11
  • @erikcw 1) and you are more intersted in knowing if|when the job is done so prefer futures over agents. 2) That's what I guessed. Did you see my answer: http://stackoverflow.com/questions/2602791/clojure-agents-consuming-from-a-queue/2622410#2622410 ? – cgrand Apr 13 '10 at 06:59

4 Answers4

23
(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done
cgrand
  • 7,939
  • 28
  • 32
  • 2
    This does not work with 1.4 anymore (`future` and `future-call` do not return `IFn`, which `repeatedly` requires). You can easily wrap a future in a function, though, by prepending `(future` with `#`, though. – Alex B Jun 08 '12 at 10:18
  • 3
    @AlexB good catch, it's not even a 1.4 problem: the # should have been there. I fixed the code, thanks! – cgrand Jun 08 '12 at 13:46
6

What you are asking for is a way to keep handing out tasks but with some upper limit. One simple approach to this is to use a semaphore to coordinate the limit. Here is how I would approach it:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

With that in place now you just need to coordinate how the tasks themselves are taken. It sounds like you already have the mechanisms in place to do that. Loop (submit-future (process-queue-item))

Timothy Pratley
  • 10,586
  • 3
  • 34
  • 63
4

Perhaps you could use the seque function? Quoting (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

What I have in mind is a lazy sequence getting queue items over the network; you'd wrap this in seque, put that in a Ref and have worker Agents consume items off of this seque. seque returns something which looks just like a regular seq from the point of view of your code, with the queue magic happening in a transparent way. Note that if the sequence you put inside is chunked, then it'll still be forced a chunk at a time. Also note that the initial call to seque itself seems to block until an initial item or two is obtained (or a chunk, as the case may be; I think that's more to do with the way lazy sequences work than seque itself, though).

A sketch of the code (a really sketchy one, not tested at all):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker's state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

Actually you'd probably want a more complex producer of the queue item seq so that you can ask it to stop producing new items (a necessity if the whole thing is to be able to be shut down gracefully; the futures will die when the task source runs dry, use future-done? to see if they've done so already). And that's just something I can see at first glance... I'm sure there's more things to polish here. I think that the general approach would work, though.

Michał Marczyk
  • 83,634
  • 13
  • 201
  • 212
  • I've added a fix to the last-but-one line of the code sketch whereby futures will actually be created. (Kind of crucial to the whole idea, really... :-)) – Michał Marczyk Apr 09 '10 at 16:02
  • I'm trying to understand this code. Why is the task-source a ref? You don't seem to alter it at any time at all. – Siddhartha Reddy Sep 05 '10 at 04:05
  • @Siddhartha Reddy: At first glance I'd say that this is why I called the code "*really* sketchy". ;-) I guess it would need an `(alter task-source rest)` (or `next`) in the `when-let` inside the `dosync` to be useful. Actually, thinking about this again, I wonder if using `seque` here is such a good idea after all; it seems to me now that it increases the number of items from the queue which would be lost in the event of a crash of the local machine (since `seque` pulls items in before they are requested by the workers). Then again, in some scenarios it might be good performance-wise; that's – Michał Marczyk Sep 05 '10 at 23:50
  • just a hunch in need of profiling, though. Incidentally, the tail self call in `do-stuff` should be changed to `(recur)` so as to avoid blowing up the stack. I think I did a much better job describing queue handling in Clojure about a month later in [this answer](http://stackoverflow.com/questions/2760017/producer-consumer-with-qualifications/2760224#2760224); I wonder if it might be more useful to you than this one? At any rate, thanks for pointing out this bug! – Michał Marczyk Sep 06 '10 at 00:00
0

Not sure how idiomatic this is, as I'm still a newbie with the language, but the following solution works for me:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
Marco Lazzeri
  • 1,808
  • 19
  • 15