2

I am learning clojure and try out its concurrency and effectiveness via a producer consumer example.

Did that and it felt pretty akward with having to use ref and deref and also watch and unwatch.

I tried to check other code snippet; but is there a better way of re factoring this other than using Java Condition await() and signal() methods along with Java Lock. I did not want to use anything in Java.

here is the code; i guess I would have made many mistakes here with my usage...

;a simple producer class
(ns my.clojure.producer
    (:use  my.clojure.consumer)
  (:gen-class)
  )

 (def tasklist( ref (list) )) ;this is declared as a global variable; to make this 
    ;mutable we need to use the fn ref


(defn gettasklist[]
    (deref tasklist) ;we need to use deref fn to return the task list
  )

(def testagent (agent 0)); create an agent


(defn emptytasklist[akey aref old-val new-val]

   (doseq [item (gettasklist)]
        (println(str "item is") item)
        (send testagent consume item)
        (send testagent increment item)
     )
      (. java.lang.Thread sleep 1000)
   (dosync ; adding a transaction for this is needed to reset

      (remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
                                       ; go to a recursive call
      (ref-set tasklist (list ) ) ; we need to make it as a ref to reassign
      (println (str "The number of tasks now remaining is=")  (count (gettasklist)))

     )
  (add-watch tasklist "key123" emptytasklist)
 )
(add-watch tasklist "key123" emptytasklist)

  (defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset
      ;(println (str "The number of tasks before") (count (gettasklist)))
      (println (str "Adding a task") task)
      (ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
      ;(println (str "The number of tasks after") (count (gettasklist)))
     )
  )

Here is the consumer code

(ns my.clojure.consumer
  )
(defn consume[c item]

  (println  "In the consume method:Item is " c item  )
  item 
)
(defn increment [c n] 
  (println "parmeters are" c n)
  (+ c n)
  )

And here is the test code ( I have used maven to run clojure code and used NetBeans to edit as this is more familiar to me coming from Java - folder structure and pom at - https://github.com/alexcpn/clojure-evolve

(ns my.clojure.Testproducer
        (:use my.clojure.producer)
        (:use clojure.test)
      (:gen-class)
  )

(deftest test-addandcheck

  (addtask 1)
  (addtask 2)
  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

If anybody can refactor this lightly so that I can read and understand the code then it will be great; Else I guess I will have to learn more

Edit -1

I guess using a global task list and making it available to other functions by de-referencing it (deref) and again making it mutable by ref is not the way in clojure;

So changing the addTask method to directly send the incoming tasks to an agent

(defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset

      (println (str "Adding a task") task)

        ;(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
       (def testagent (agent 0)); create an agent
       (send testagent consume task)
       (send testagent increment task)

     )

However when I tested it

(deftest test-addandcheck
  (loop [task 0]
    (when ( < task 100)
        (addtask task)
      (recur (inc task))))

  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

after sometime the I am getting Java rejected execution exception -- This is fine if you do Java threads, because you take full control. But from clojure this looks odd, especially since you are not selecting the ThreadPool stratergy yourself

Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
        at clojure.lang.Agent$Action.execute(Agent.java:56)
        at clojure.lang.Agent$Action.doRun(Agent.java:95)
        at clojure.lang.Agent$Action.run(Agent.java:106)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87
Alex Punnen
  • 5,287
  • 3
  • 59
  • 71

4 Answers4

4

I think modeling producer & consumer in clojure would be done the easiest (and most efficiently) using lamina channels.

MHOOO
  • 633
  • 6
  • 15
  • Thanks,I am sure this is the best way, but I don't want to use any libraries now since I am learning – Alex Punnen Jan 23 '12 at 06:33
  • I guess using a global task list and making it available to other functions by de-referencing it (deref) and again making it mutable by ref is not the way in clojure; – Alex Punnen Jan 23 '12 at 11:01
1

I have made similar clojure program with producer/consumer pattern in Computing Folder Sizes Asynchronously.

I don't think that you really need to use Refs. You have single tasklist that is mutable state that you want to alter synchronously. Changes are relatively quick and do not depend on any other external state, only the variable that comes in to consumer.

As far with atoms there is swap! function which can help you do the changes the way I understood you need.

You can look at my snippet Computing folder size i think it can at least show you proper use of atoms & agents. I played with it a lot, so it should be correct.

Hope it helps!

Regards, Jan

Community
  • 1
  • 1
jppalencar
  • 178
  • 1
  • 10
  • The tasklist can be altered by any number of producer threads at any time. – Alex Punnen Jan 23 '12 at 06:35
  • That's correct, it's the way you access the mutable state - that is important. Remember you have **one shared state**. Atoms work wonderfully when you want to insure atomic updates to an individual piece of state. _I do not see that you are coordinating any updates._ The producer produces data, and consumer consumes data from the common shared state. – jppalencar Jan 24 '12 at 09:51
0

I was looking at the Clojure examples for Twitter storm. He just used LinkedBlockingQueue. It's easy to use, concurrent and performs well. Sure, it lacks the sex-appeal of an immutable Clojure solution, but it will work well.

BillRobertson42
  • 12,602
  • 4
  • 40
  • 57
  • I have done multi threaded programs in Java using BlockingQueue Future Task ets. And it is easy and efficient. Clojure for multi threading for me as a clojure amateaur dont appeal much, if I have to write so much ugly code for a producer-consumer, with so much ref and deref – Alex Punnen Apr 23 '13 at 12:32
0

I've come across several use cases where I need the ability to:

  • strictly control the number of worker threads on both the producer and consumer side
  • control the maximum size of the "work queue" in order to limit memory consumption
  • detect when all work has been completed so that I can shut down the workers

I've found that the clojure built-in concurrency features (while amazingly simple and useful in their own right) make the first two bullet points difficult. lamina looks great, but I didn't see a way that it would address my particular use cases w/o the same sort of extra plumbing that I'd need to do around an implementation based on BlockingQueue.

So, I ended up hacking together a simple clojure library to try to solve my problems. It is basically just a wrapper around BlockingQueue that attempts to conceal some of the Java constructs and provide a higher-level producer-consumer API. I'm not entirely satisfied with the API yet; it'll likely evolve a bit further... but it's operational:

https://github.com/cprice-puppet/freemarket

Example usage:

(def myproducer (producer producer-work-fn num-workers max-work))
(def myconsumer (consumer myproducer consumer-work-fn num-workers max-results))
(doseq [result (work-queue->seq (:result-queue myconsumer))]
  (println result))

Feedback / suggestions / contributions would be welcomed!