0

I'm running Spark 3.3.1 in Standalone Cluster mode using Hadoop 3.3.4 as storage. I'm attempting to run a large script that re-uses a permanent temp table at different stages. Here's a rough outline of how the table is used:

DROP TABLE IF EXISTS AiTemp;

CREATE TABLE AiTemp AS
SELECT *
FROM SomeTable;

-- Do some work with the table

-- Drop the table
DROP TABLE IF EXISTS AiTemp;


-- Some unrelated code does some unrelated things here
-- Later on in the script, I reuse the table for an unrelated purpose


DROP TABLE IF EXISTS AiTemp;

CREATE TABLE AiTemp AS
SELECT *
FROM SomeOtherTable;

-- Do some different work with the table

--Drop the table
DROP TABLE IF EXISTS AiTemp;

Even though I drop the table each time after I am finished with it, I'm still getting a caching error:

22/11/09 20:15:20 WARN TaskSetManager: Lost task 27.1 in stage 230.0 (TID 2612) (10.0.11.2 executor 1): java.io.FileNotFoundException:
File does not exist: hdfs://sparkmaster:9000/user/spark/warehouse/aitemp/part-00027-7640da4d-0c27-484e-b2ca-7bd20ed86371-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.

        at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

The script doesn't fail after this message is shown, it continues executing. Since the script continues running, does this mean Spark refreshed the cache and restarted the task, and that the failed task eventually completed?

Additionally, what causes this message? It appears that I could put an UNCACHE TABLE IF EXISTS statement before each drop to ensure the cache is invalidated/cleared, but it doesn't feel like that should be necessary if I'm explicitly dropping the table.

EDIT: I put a bunch of UNCACHE TABLE calls before dropping AND creating the table in question, but the error persisted. I changed my code to create the table and fill it in two separate steps, and that works.. but I'd still like to understand why this happened in the first place.

Here's how the code looks now:

DROP TABLE IF EXISTS AiTemp;
CREATE TABLE AiTemp LIKE SomeTable;
INSERT INTO AiTemp
SELECT *
FROM SomeTable;

DROP AiTemp;

EDIT 2: The update to separate CREATE/INSERT doesn't fix the issue. I'm still receiving the error.

Patrick Tucci
  • 1,824
  • 1
  • 16
  • 22
  • Does this answer your question? [You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved](https://stackoverflow.com/questions/63731085/you-can-explicitly-invalidate-the-cache-in-spark-by-running-refresh-table-table) – Matt Andruff Nov 22 '22 at 14:37
  • @MattAndruff no, because the table doesn't exist when I receive the error. I can't refresh the table cache when the code causes the error is a CTAS statement. – Patrick Tucci Nov 23 '22 at 15:37

1 Answers1

0

let spark do the clean up for you.

Use

df.createTempView('AiTemp')
df2.createTempView('AiTemp2')

And don't worry about the cleanup.

Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • The codebase has to be as close to ANSI SQL compatible as possible. So any other Spark-conpliant languages are out. And really Spark SQL should be taking care of this for me already. Dropping a table should clear the cache. And if it doesn't, creating a new table with the same name should refresh it. – Patrick Tucci Nov 24 '22 at 17:28
  • Why are reusing names if it's different data? (or not maintaining it if it's the same data) – Matt Andruff Nov 25 '22 at 14:00
  • It's different data each time the table is created/used/dropped. And why should I need different table names when the table serves the same functional purpose in different scripts, just with different data? If the table is dropped at the end of the script, the name should be eligible for re-use anywhere else in the codebase without worrying about caching issues. – Patrick Tucci Nov 26 '22 at 15:29
  • Well, it might add code clarity to use a name specific per scoped script and be a work around for your current issue. That would be 2 reasons I might suggest changing it. – Matt Andruff Nov 28 '22 at 16:17
  • If you had 2 - 3 scripts, that might be feasible. My codebase has just under 700 SQL scripts as of the the time of this post and is only about 30% complete. Coming up with a distinct temp table name in each script is theoretically possible, but not feasible. If there's no sensible fix to this issue, I'd rather re-write the codebase to use CTEs and retest on all supported RDBMS than introduce a Spark-specific hack. – Patrick Tucci Nov 29 '22 at 11:24