0

I would like to split my spark JavaRDD into multiple RDD and put these JavaRDD into a list.

What I mean is :

RDD -> [1,2,3,4,5,6,7,8,9,10]

List -> [[1,2,3],[4,5,6],[7,8,9],[10]]

To do so I did the following code :

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.List;

public class SplitRdd {
    public static void main(String[] args) {
        SparkConf configuration = new SparkConf().setAppName("splitRdd").setMaster("local[*]");
        SparkContext sc = new SparkContext(configuration);
        sc.setLogLevel("ERROR");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sc);

        ArrayList<Integer> integers = new ArrayList<>();
        int integerRange = 200;

        for (int i = 0; i < integerRange; i++) {
            integers.add(i);
        }

        JavaRDD<Integer> rdd = javaSparkContext.parallelize(integers);


        List<JavaRDD<Integer>> javaRDDS = splitRDD(rdd, 10, javaSparkContext);

        System.out.println("Size of the list " + javaRDDS.size());
        javaRDDS.forEach(currentRdd -> System.out.println(currentRdd.sortBy((Function<Integer, Object>) integer -> integer, true, 1).collect()));

    }

    private static <T> List<JavaRDD<T>> splitRDD(JavaRDD<T> rdd, int rowInEachRdd, JavaSparkContext javaSparkContext) {
        List<JavaRDD<T>> rdds = new ArrayList<>();

        JavaRDD<T> rddBuilder = rdd;

        long numberOfIteration = (rddBuilder.cache().count() / rowInEachRdd) + 1;

        for (int i = 0; i < numberOfIteration; i++) {
            List<T> take = rddBuilder.take(rowInEachRdd);

            JavaRDD<T> newRdd = javaSparkContext.parallelize(take);

            rddBuilder = rddBuilder.subtract(newRdd);

            System.out.format("Iteration %d on %d\n", i, numberOfIteration);

            rdds.add(newRdd);
        }

        return rdds;

    }
}

This code follows the expected behaviour and I have this as a outcome in local mode :

Iteration 0 on 21
Iteration 1 on 21
Iteration 2 on 21
Iteration 3 on 21
Iteration 4 on 21
Iteration 5 on 21
Iteration 6 on 21
Iteration 7 on 21
Iteration 8 on 21
Iteration 9 on 21
Iteration 10 on 21
Iteration 11 on 21
Iteration 12 on 21
Iteration 13 on 21
Iteration 14 on 21
Iteration 15 on 21
Iteration 16 on 21
Iteration 17 on 21
Iteration 18 on 21
Iteration 19 on 21
Iteration 20 on 21
Size of the list 21
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[16, 24, 64, 72, 80, 88, 128, 136, 144, 192]
[32, 40, 48, 96, 104, 112, 152, 160, 168, 176]
[33, 56, 65, 73, 97, 120, 129, 161, 184, 193]
[17, 41, 49, 81, 105, 113, 137, 145, 169, 177]
[25, 34, 57, 66, 89, 121, 130, 153, 185, 194]
[10, 18, 42, 74, 82, 98, 106, 138, 162, 170]
[26, 50, 58, 90, 114, 122, 146, 154, 178, 186]
[11, 35, 43, 67, 75, 99, 131, 139, 163, 195]
[19, 27, 51, 83, 91, 107, 115, 147, 171, 179]
[36, 59, 68, 100, 123, 132, 155, 164, 187, 196]
[12, 20, 44, 52, 76, 84, 108, 140, 148, 172]
[28, 60, 69, 92, 116, 124, 133, 156, 180, 188]
[13, 37, 45, 77, 101, 109, 141, 165, 173, 197]
[21, 29, 53, 61, 85, 93, 117, 149, 157, 181]
[14, 38, 70, 78, 102, 125, 134, 166, 189, 198]
[22, 46, 54, 86, 110, 118, 142, 150, 174, 182]
[30, 39, 62, 71, 94, 126, 135, 158, 190, 199]
[15, 23, 47, 79, 87, 103, 111, 143, 167, 175]
[31, 55, 63, 95, 119, 127, 151, 159, 183, 191]
[]

The problem is that when I execute in yarn-client on my real cluster with more real data, I obtain a

java.lang.OutOfMemoryException: GC overhead limit exceeded

Why ? I think the code is failling on the substract. (Don't have access to the logs anymore sorry)

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Omegaspard
  • 1,828
  • 2
  • 24
  • 52
  • http://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning – vaquar khan Jun 30 '18 at 17:55
  • @khan, this is what I don't understand. At each iteration of my loop I take only a small part of my rdd. When the iteration is over, the GC should have free the memory allocated for the list no ? – Omegaspard Jul 01 '18 at 05:55

0 Answers0