10

Referring to here on the difference between saveastable and insertInto

What is the difference between the following two approaches :

df.saveAsTable("mytable");

and

df.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists " + my_temp_table);
spark.sql("create table mytable as select * from 
my_temp_table");

In which case is the table stored in memory and in which case physically on disk ?

Also, as per my understanding, createOrReplaceTempView only register the dataframe (already in memory) to be accessible through Hive query, without actually persisting it, is it correct ?

I have to Join hundreds of tables and hit OutOfMemory issue. In terms of efficiency, what would be the best way ?

  • df.persist() and df.join(..).join(..).join(..).... #hundred joins

  • createOrReplaceTempView then join with spark.sql(),

  • SaveAsTable (? not sure the next step)

  • Write to disk with Create Table then join with spark.sql()?

Kenny
  • 1,902
  • 6
  • 32
  • 61

1 Answers1

11

Let's go step-by-step.

In the case of df.saveAsTable("mytable"), the table is actually written to storage (HDFS/ S3). It is a Spark action.

On the other hand: df.createOrReplaceTempView("my_temp_table") is a transformation. It is just an identifier to be used for the DAG of df. Nothing is actually stored in memory or on disk.

spark.sql("drop table if exists " + my_temp_table) drops the table.

spark.sql("create table mytable as select * from my_temp_table") creates mytable on storage. createOrReplaceTempView creates tables in global_temp database.

It would be best to modify the query to:

create table mytable as select * from global_temp.my_temp_table

createOrReplaceTempView only register the dataframe (already in memory) to be accessible through Hive query, without actually persisting it, is it correct?

Yes, for large DAGs, spark will automatically cache data depending on spark.memory.fraction setting. Check this page.

I have to Join hundreds of tables and hit OutOfMemory issue. In terms of efficiency, what would be the best way ?

df.persist() and df.join(..).join(..).join(..).... #hundred joins

createOrReplaceTempView then join with spark.sql(),

SaveAsTable (? not sure the next step)

Write to disk with Create Table then join with spark.sql()?

persist would store some data in cached format depending on available memory and for end table that is generated by joining hundreds of tables, this would probably is not the best approach.

It would not be possible to suggest the approach that would work for you, but here are some general patterns:

If writes fail with OOM and the default spark.shuffle.partitions are used, then the start point is to increase the shuffle partition count to ensure that each executor's partition is sized correctly depending on its memory availability.

The spark.shuffle.partitions setting can be set across different joins, it doesn't need to be a constant across the Spark job.

Calculating partition size become difficult if multiple tables are involved. In that case, writing to disk and reading back before large tables is a good idea.

For small tables, less than 2GB, broadcasting is a possibility. The default limit is 10MB (I think) but it can be changed.

It would be best if the final table is stored on disk rather than serving thrift clients through temp tables.

Good luck!

Sai
  • 711
  • 6
  • 24
  • Thanks for the insight, Sai. Do you mean that _SaveAsTable_ and _Create Table_ are the same, both writing to HDFS ? The issue I have is better described https://stackoverflow.com/questions/55656759/add-column-from-one-dataframe-to-another-without-join?noredirect=1#comment98075663_55656759 . It's typical feature processing, with one feature spawning small table of 5-10 columns, in the end joining all of them. My limit at the moment before OutOfMemory is 500k rows, 200 features (spawning into 500+ cols). I tried both splitting into chunks: (join20)join(join20) and left to right ((join20)join20) – Kenny Apr 16 '19 at 13:43
  • @Kenny, `Do you mean that SaveAsTable and Create Table are the same, both writing to HDFS ?` Yes. `I tried both splitting into chunks: (join20)join(join20) and left to right ((join20)join20)` this would not work because joins are just transformations and the spark plan would flatten all the joins anyway they are broken them. I would imagine the idea for splitted joins is to reduce the DAG (?) in which case, you might want to save the joined tables before attempting bigger table joins. – Sai Apr 16 '19 at 14:31
  • For eg: `A join B` = C, `C join D` = E and lets assume E is the table you are interested in and D is a big table (because of which OOMs occur). When Spark actions like `saveAsTable` or if you register that as temp table and query it through Thrift, the plan would be `A join B join D` which means that DAG is not shortened. But if C is saved by writing to disk (as it a action), the next join will truly be `C join D`. – Sai Apr 16 '19 at 14:35