38

I'm trying to write a DataFrame into Hive table (on S3) in Overwrite mode (necessary for my application) and need to decide between two methods of DataFrameWriter (Spark / Scala). From what I can read in the documentation, df.write.saveAsTable differs from df.write.insertInto in the following respects:

  • saveAsTable uses column-name based resolution while insertInto uses position-based resolution
  • In Append mode, saveAsTable pays more attention to underlying schema of the existing table to make certain resolutions

Overall, it gives me the impression that saveAsTable is just a smarter version of insertInto. Alternatively, depending on use-case, one might prefer insertInto

But do each of these methods come with some caveats of their own like performance penalty in case of saveAsTable (since it packs in more features)? Are there any other differences in their behaviours apart from what is told (not very clearly) in the docs?


EDIT-1

Documentation says this regarding insertInto

Inserts the content of the DataFrame to the specified table

and this for saveAsTable

In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function

Now I can list my doubts

  • Does insertInto always expect the table to exist?
  • Do SaveModes have any impact on insertInto?
  • If above answer is yes, then
    • what's the differences between saveAsTable with SaveMode.Append and insertInto given that table already exists?
    • does insertInto with SaveMode.Overwrite make any sense?
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • All the QAs / links I've come across complain that Spark intrinsically overwrites all partitions (in Overwrite mode) and that some tricks are needed to bypass this rather inhibiting shortcoming – y2k-shubham Dec 18 '17 at 05:13

5 Answers5

36

DISCLAIMER I've been exploring insertInto for some time and although I'm far from an expert in this area I'm sharing the findings for greater good.

Does insertInto always expect the table to exist?

Yes (per the table name and the database).

Moreover not all tables can be inserted into, i.e. a (permanent) table, a temporary view or a temporary global view are fine, but not:

  1. a bucketed table

  2. an RDD-based table

Do SaveModes have any impact on insertInto?

(That's recently been my question, too!)

Yes, but only SaveMode.Overwrite. After you think about insertInto the other 3 save modes don't make much sense (as it simply inserts a dataset).

what's the differences between saveAsTable with SaveMode.Append and insertInto given that table already exists?

That's a very good question! I'd say none, but let's see by just one example (hoping that proves something).

scala> spark.version
res13: String = 2.4.0-SNAPSHOT

sql("create table my_table (id long)")
scala> spark.range(3).write.mode("append").saveAsTable("my_table")
org.apache.spark.sql.AnalysisException: The format of the existing table default.my_table is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
  at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:117)
  at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:76)
...
scala> spark.range(3).write.insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|  2|
|  0|
|  1|
+---+

does insertInto with SaveMode.Overwrite make any sense?

I think so given it pays so much attention to SaveMode.Overwrite. It simply re-creates the target table.

spark.range(3).write.mode("overwrite").insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|  1|
|  0|
|  2|
+---+

Seq(100, 200, 300).toDF.write.mode("overwrite").insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|200|
|100|
|300|
+---+
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 1
    Hi Jacek, do you have any theory why saveAsTable behaves like this re: org.apache.spark.sql.AnalysisException? – Roman Jan 22 '19 at 22:48
  • 1
    Looks like only Hive tables that Spark creates (ParquetFileFormat) are supported? – Jacek Laskowski Jan 23 '19 at 11:00
  • is there a particular reason you used `insertInto` instead of `saveAsTable` in your example? Would you expect them to behave differently? – nimish May 16 '19 at 20:46
  • The only reason I used `insertInto` instead of `saveAsTable` in my example was that the question specifically asked about the behaviour of `insertInto` per `SaveMode` / `mode`. – Jacek Laskowski May 17 '19 at 05:48
  • actually i want to write into a partitioned table for the first time in append mode , its failing with the same format issue . df.write.partitionBy("code").mode("append").format("orc").saveAsTable("test.t1") – Aviral Kumar May 15 '20 at 12:45
  • @AviralKumar are you able to solve the format issue? If yes, please tell how. – SrinR Jul 30 '20 at 05:35
  • @JacekLaskowski Tanks for your contribution ;-). One question...When you work with an external Hive Metastore... saveAsTable (in overwrite mode) drops the table and partitions in Hive metastore, right? Does this make it more efficient work with insert Into to avoid msck repair, refresh table etc...? Thanks a lot !! – Christian Herrera Jiménez Oct 29 '20 at 08:39
  • @ChristianHerreraJiménez I've got no idea (and would not be surprised to find out that it varies version by version). You'd simply be better off checking it out yourself by comparing query plans (and perhaps looking at the source code a bit). – Jacek Laskowski Oct 29 '20 at 10:20
16

I want to point out a major difference between SaveAsTable and insertInto in SPARK.

In partitioned table overwrite SaveMode work differently in case of SaveAsTable and insertInto.

Consider below example.Where I am creating partitioned table using SaveAsTable method.

hive> CREATE TABLE `db.companies_table`(`company` string) PARTITIONED BY ( `id` date);
OK
Time taken: 0.094 seconds
import org.apache.spark.sql._*
import spark.implicits._
import org.apache.spark.sql._

scala>val targetTable = "db.companies_table"

scala>val companiesDF = Seq(("2020-01-01", "Company1"), ("2020-01-02", "Company2")).toDF("id", "company")

scala>companiesDF.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

scala> spark.sql("select * from db.companies_table").show()
+--------+----------+
| company|        id|
+--------+----------+
|Company1|2020-01-01|
|Company2|2020-01-02|
+--------+----------+

Now I am adding 2 new rows with 2 new partition values.

scala> val companiesDF = Seq(("2020-01-03", "Company1"), ("2020-01-04", "Company2")).toDF("id", "company")

scala> companiesDF.write.mode(SaveMode.Append).partitionBy("id").saveAsTable(targetTable)

scala>spark.sql("select * from db.companies_table").show()

+--------+----------+                                                           
| company|        id|
+--------+----------+
|Company1|2020-01-01|
|Company2|2020-01-02|
|Company1|2020-01-03|
|Company2|2020-01-04|
+--------+----------+

As you can see 2 new rows are added to the table.

Now let`s say i want to Overwrite partition 2020-01-02 data.

scala> val companiesDF = Seq(("2020-01-02", "Company5")).toDF("id", "company")

scala>companiesDF.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

As per our logic only partitions 2020-01-02 should be overwritten but the case with SaveAsTable is different.It will overwrite the enter table as you can see below.

scala> spark.sql("select * from db.companies_table").show()
+-------+----------+
|company|        id|
+-------+----------+
|Company5|2020-01-02|
+-------+----------+

So if we want to overwrite only certain partitions in the table using SaveAsTable its not possible.

Refer this Link for more details. https://towardsdatascience.com/understanding-the-spark-insertinto-function-1870175c3ee9

Sandeep Khot
  • 301
  • 3
  • 5
  • 6
    Take note that this only happened for managed table. External table can use the config `spark.sql.sources.partitionOverwriteMode` set to dynamic and then refresh the table (`msck repair table ..`) – Joshua H Aug 16 '20 at 18:23
  • 1
    Yes @Joshua H. I wanted to update partitions in an external table and I can do that using ```df.write.mode("overwrite").partitionBy('') .option("partitionOverwriteMode", "dynamic").option('path', '') .saveAsTable(op_result_table)``` – 7bStan Jan 24 '23 at 10:18
5

I recently started converting my Hive Scripts to Spark and I am still learning.

There is one important behavior I noticed with saveAsTable and insertInto which has not been discussed.

df.write.mode("overwrite").saveAsTable("schema.table") drops the existing table "schema.table" and recreates a new table based on the 'df' schema. The schema of the existing table becomes irrelevant and does not have to match with df. I got bitten by this behavior since my existing table was ORC and the new table created was parquet (Spark Default).

df.write.mode("overwrite").insertInto("schema.table") does not drop the existing table and expects the schema of the existing table to match with the schema of 'df'.

I checked the Create Time for the table using both options and reaffirmed the behavior.

Original Table stored as ORC - Wed Sep 04 21:27:33 GMT 2019

After saveAsTable (storage changed to Parquet) - Wed Sep 04 21:56:23 GMT 2019 (Create Time changed)

Dropped and Recreated origina table (ORC) - Wed Sep 04 21:57:38 GMT 2019

After insertInto (Still ORC) - Wed Sep 04 21:57:38 GMT 2019 (Create Time Not changed)

ZeroDecibels
  • 115
  • 1
  • 5
2

Here is the overall differences in summary table.

enter image description here

Mohana B C
  • 5,021
  • 1
  • 9
  • 28
1

Another important point that I do consider while inserting data into an EXISTING Hive dynamic partitioned table from spark 2.xx :

df.write.mode("append").insertInto("dbName"."tableName")

Above command will intrinsically map the data in your "df" and append only new partitions to existing table.

Hope, it adds another point in deciding when to use "insertInto".

anshuman sharma
  • 121
  • 1
  • 4