2

I'm currently doing some experiments with spark in order to better understand what performance I can expect when working with highly cascaded queries.

It seems to me, that invoking persist() (or cache()) on intermediary results causes my execution time to increase exponentially.

Consider this minimal example:

SparkSession spark = SparkSession.builder()
        .appName(getClass().getName())
        .master("local[*]")
        .getOrCreate();

Dataset ds = spark.range(1);
for (int i = 1; i < 200; i++) {
    ds = ds.withColumn("id", ds.col("id"));

    ds.cache();

    long t0 = System.currentTimeMillis();
    long cnt = ds.count();
    long t1 = System.currentTimeMillis();

    System.out.println("Iteration " + String.format("%3d", i) + " count: " + cnt + " time: " + (t1 - t0) + "ms");
}

Without the ds.cache() in the code the time for count() is rather constant. With ds.cache() however execution time start to increase exponentially:

iteration   without cache()   with cache()
      ...               ...            ...
       24                61            297
       25                74            515
       26                86          1.036
       27                78          1.904
       28                73          3.233
       29                79          6.815
       30                75         12.549
       31               107         26.379
       32                69         46.207
       33                54        102.172

Any idea what is going on here? From what I understand how .persist works, this doesn't make much sense.

Thanks,
Thorsten

Thorsten
  • 51
  • 1

1 Answers1

0
  • Caching is not a free lunch. It requires expensive state management, possible cache eviction (default level for Dataset is MEMORY_AND_DISK so in case of eviction data is written to disk), and possibly garbage collection cycle.

    See related question Spark: Why do i have to explicitly tell what to cache?

  • In the presented scenario caching is completely useless and you code doesn't measure anything.

    • Since all you do is count, all expression generated using withColumn are eliminated from the execution plan, unless prohibited by cache.
    • If it was forced to evaluate (foreach(_ => ()) is one approach), all operations could be merged into a single stage, as previous stages are overwritten and eliminated from the plan, so cache has not value here.
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Following your advice I compared the plans for the scenario with and without caching: The logical plans are of course identical - a long list of Project on top of the Range definition. The optimized plan shows what you predicted: For the case without caching it's trivial. With caching enabled for each Project there is a InMemoryTableScan. So why is this happening? The InMemoryTableScan is obviously the right thing to do, but why does this happen recursively for each level? Once, for the topmost cached intermediary result, should be sufficient, shouldn't it? – Thorsten Feb 16 '18 at 21:26
  • I also tried your second suggestion and replaced long cnt = ds.count(); with long cnt = new ArrayList<>(ds.collectAsList()).size(); while this is obviously not a feasible solution for a real world application, interestingly enough execution time does no longer increase exponentially. The plans are the same as before. How can retrieving all values be faster than simply doing a count? – Thorsten Feb 16 '18 at 21:26