3

My dataset is ~20 millions of rows, it takes ~ 8 GB of RAM. I'm runing my job with 2 executors, 10 GB RAM per executor, 2 cores per executor. Due to further transformations, data should be cached all at once.

I need to reduce duplicates based on 4 fields (choose any of duplicates). Two options: using groupBy and using repartition and mapPartitions. The second approach allows you to specify num of partitions, and could perform faster because of this in some cases, right?

Could you please explain what option has better performance? Do both options has the same RAM consumption?

Using groupBy

dataSet
    .groupBy(col1, col2, col3, col4)
    .agg(
        last(col5),
        ...
        last(col17)
    );

Using repartition and mapPartitions

dataSet.sqlContext().createDataFrame(
    dataSet
        .repartition(parallelism, seq(asList(col1, col2, col3, col4)))
        .toJavaRDD()
        .mapPartitions(DatasetOps::reduce),
    SCHEMA
);

private static Iterator<Row> reduce(Iterator<Row> itr) {
    Comparator<Row> comparator = (row1, row2) -> Comparator
        .comparing((Row r) -> r.getAs(name(col1)))
        .thenComparing((Row r) -> r.getAs(name(col2)))
        .thenComparingInt((Row r) -> r.getAs(name(col3)))
        .thenComparingInt((Row r) -> r.getAs(name(col4)))
        .compare(row1, row2);

    List<Row> list = StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
        .collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));

    return list.iterator();
}
10465355
  • 4,481
  • 2
  • 20
  • 44
VB_
  • 45,112
  • 42
  • 145
  • 293

1 Answers1

4

The second approach allows you to specify num of partitions, and could perform faster because of this in some cases, right?

Not really. Both approaches allow you to specify the number of partitions - in the first case through spark.sql.shuffle.partitions

spark.conf.set("spark.sql.shuffle.partitions", parallelism)

However the second approach is inherently less efficient if duplicates are common, as it shuffles first, and reduces later, skipping map-side reduction (in other words it is yet another group-by-key). If duplicates are rare, this won't make much difference though.

On a side note Dataset already provides dropDuplicates variants, which take a set of columns, and first / last is not particular meaningful here (see discussion in How to select the first row of each group?).

10465355
  • 4,481
  • 2
  • 20
  • 44