0

I am trying to load parquet files in the following directories:

    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-1
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-2
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-3
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-4
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-5
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-6
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-7
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-8

This is what I wrote in Pyspark

s3_bucket_location_of_data = "s3://dir1/model=m1/version=newest/versionnumber=3/scores/"
df = spark.read.parquet(s3_bucket_location_of_data)

but I received the following error:

Py4JJavaError: An error occurred while calling o109.parquet.
: java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-1
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-2
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-3
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-4
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-5
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-6
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-7
    s3://dir1/model=m1/version=newest/versionnumber=3/scores/marketplace_id-8

After reading other StackOverflow posts like this, I tried the following:

base_path="s3://dir1/" # I have tried to set this to "s3://dir1/model=m1/version=newest/versionnumber=3/scores/" as well, but it didn't work
s3_bucket_location_of_data = "s3://dir1/model=m1/version=newest/versionnumber=3/scores/"
df = spark.read.option("basePath", base_path).parquet(s3_bucket_location_of_data)

but that returned a similar error message as above. I am new to Spark/Pyspark and I don't know what I could possibly be doing wrong here. Thank you in advance for your answers!

user1330974
  • 2,500
  • 5
  • 32
  • 60

1 Answers1

1

You don't need to specify the detailed path. Just load the files from the base_path.

df = spark.read.parquet("s3://dir1")
df.filter("model = 'm1' and version = 'newest' and versionnumber = 3")

The directory structure is already partitioned by 3 columns, model, version and versionnumber. So read the base and filter the partition, then you could read all the parquet files under the partition path.

Lamanus
  • 12,898
  • 4
  • 21
  • 47
  • Thank you for the reply. Sorry about delay in my response (was traveling and just got back)! Your suggestion is very interesting; however, `"s3://dir1"` has A LOT of data and thus, this line `df = spark.read.parquet("s3://dir1")` will just fail to complete (error out). I have given up and decided to use `union` to concatenate them using `for` loop... – user1330974 Mar 01 '23 at 01:24