I'm learning spark. I'm trying to demonstrate a simple clustering algorithm. Let's say I find k items on my rdd that represent a cluster for me. The problem is when I try to take() them to the driver, I get a warning saying " block locks were not released by tid ". This also leads to poor performance... 20 Iterations of the code below take 11seconds on 4-cores with a 2mb dataset. That is way worse that my serial version. Good thing is that I still get the correct results. Take a look please :
First, I take one item. Any item will do
List<Tuple2<Long, Instance>> selectedTuple = dataset.take(1);
Instance selected = selectedTuple.get(0)._2;
Then I create an object that will find candidate items for my cluster based on the selected item I just found. For the record, it implements PairFlatMapFunction
NCPComparator comp = new NCPComparator(meta, selected);
The next step finds some candidates
JavaPairRDD<Long, Double> candidates = this.dataset.mapPartitionsToPair(comp);
But now, I need the k one's with the lowest Double value, so I implemented a TupleComparator. Then I invoked takeOrdered
List<Tuple2<Long, Double>> groupList = candidates.takeOrdered(k, new TupleComparator());
At this point, that warning appears. Same goes for invocating top(). And as an iterative algorithm, It appears in every iteration. I am not showing you the rest of the code. You guessed it.. it's more than trivial :-)
Also, I did proper persist on my rdds. Jobs seem to be working like a charm. I found out that lots of people have this problem but there is no answer.
I hope I gave you enough details, that's my first post!