1

After a recent upgrade to HDP 3.1 now using spark 2.3.x instead of 2.2.x a query like:

spark.sql("SELECT * from mydb.mytable").filter('partition_date between "202001010000" and "202001020000").write.parquet("foo.out")

sometimes fails when reading from an HDFS backed hive table (no object storage). You have to know that the underlying data (an EXTERNAL table in Hive) has a data retention period and any data older than this date will be deleted. Sometimes, this deletion might occur during the execution of the above-mentioned query. The deletion happens every 5 minutes.

Even though:

PartitionFilters: [isnotnull(partition_date#3099), (partition_date#3099 >= 202001010000), (partition_date#3099 <= 202001020000)]

partition filtering (predicate pushdown) seems to be enabled more than the desired partitions are read during the initial path traversal. After the upgrade to 2.3, Spark shows in the UI the progress of listing file directories. Interestingly, we always get two entries. One for the oldest available directory, and one for the lower of the two boundaries of interest:

Listing leaf files and directories for 380 paths:
/path/to/files/on/hdfs/mydb.db/mytable/partition_date==202001010000/sub_part=0, ...

Listing leaf files and directories for 7100 paths:
/path/to/files/on/hdfs/mydb.db/mytable/partition_date=201912301300/sub_part=0, ...

Notice:

  • the logged number of files (308, 7100) both do not seem to reflect what a manual check would suggest
  • the job (sometimes) fails during the recursive listing of leaf files
  • the error message:

    File does not exist: /path/to/files/on/hdfs/mydb.db/mytable/partition_date=201912301300/sub_part=0/file_name_unique_hash_timestamp.par

How can I force Spark to list only directories in the desired interval and not outside and potentially collide with the maximum data retention duration?

It looks like this is related:

Georg Heiler
  • 16,916
  • 36
  • 162
  • 292

2 Answers2

1

@meniluca is correct in the sense that there must be a mismatch with what HDFS has available and the Hive metastore reports as what should be available.

However, instead of utilizing views which look a bit spooky/not easy to understand (in the context of file paths being included in the read operation), I prefer:

spark.read.option("basePath", "/path/to/mydb.db/mytable").orc("/path/to/mydb.db/mytable/partition_date=202001[1-2]*/*", "/path/to/mydb.db/mytable/partition_date=202001[3-4]*/*")

this forces spark to list the right (desired paths)

Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
0

Have you tried with this?

spark.sql("MSCK REPAIR TABLE table_name")

it saved my life so many times.

Edit.

After the discussion in the comments, please try creating a view. The problem cannot be solved unless you re-run "select * from ..." immediately after the partition is dropped.

Creating a view will provide you with such a workaround:

CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ]
  [COMMENT view_comment]
  [TBLPROPERTIES (property_name = property_value, ...)]
  AS SELECT * FROM mytable;

from Hive/LanguageManual+DDL

Then replace the table with your view. If you don't have the right to create such view, please ask an admin to do it for you. They should accomodate this request as it seems you are trying to solve their problem IMO.

meniluca
  • 186
  • 7
  • It is an external table. After the update (new files are written every 5 minutes) this command is performed to add any missing partitions. So you think running this before listing of directories to cleaning up the partitions helps? But what if it takes more than 5 minutes to list them? Then it might also crash. Besides that, as an analytical user an analyst might not have the permissions to write to the table i.e. execute this command. – Georg Heiler Jan 29 '20 at 15:41
  • Indeed, it's not the end user that is supposed to run this. After your job deletes the partitions you must either run this command or a invalidate cache in impala. It works with external partition too. If the number of deleted files is relatively small, then it should run in few seconds. – meniluca Jan 29 '20 at 16:08
  • No. This is exactly the problem: it is not my job deleting partitions, but some other one (3-rdparty). And they are also adding new data every 5 minutes and afterwards calling this command to make the partitions available, so I would expect that this would not help. – Georg Heiler Jan 29 '20 at 16:11
  • No, it won't help. The only way to read it safely is with a view. I am updating my answer. – meniluca Jan 29 '20 at 16:16
  • Interesting, why would the view solve this? I do not understand this yet. Can you explain this further? – Georg Heiler Jan 29 '20 at 16:23
  • Recalculating the updated partition list before reading, that's the issue. You do it on the fly using a view. I haven't tested yet but I have used views extensively in the past, I was not having this issue after partitions where dropped. – meniluca Jan 30 '20 at 07:52
  • But whilst listing the directories spark would do the same thing. I still do not understand why this would help. Do you mean that creating the view is somewhat similarlar to the repair command in the sense it would scan the whole table for partitions? – Georg Heiler Jan 30 '20 at 08:09
  • The view dinamically calculates the list of files to access/read every time you select the table. The problem is that if they drop partition too frequently there is a fraction of time that will make this problem appear again. Have you tried it out? – meniluca Jan 31 '20 at 13:05
  • No, I thought it is safer to get rid of hive completely during the read process - as outlined in my own answer - one less point to worry about. – Georg Heiler Jan 31 '20 at 13:27