8

I would like to read multiple parquet files into a dataframe from S3. Currently, I'm using the following method to do this:

files = ['s3a://dev/2017/01/03/data.parquet',
         's3a://dev/2017/01/02/data.parquet']
df = session.read.parquet(*files)

This works if all of the files exist on S3, but I would like to ask for a list of files to be loaded into a dataframe without breaking when some of the files in the list don't exist. In other words, I would like for sparkSql to load as many of the files as it finds into the dataframe, and return this result without complaining. Is this possible?

ZygD
  • 22,092
  • 39
  • 79
  • 102
vaer-k
  • 10,923
  • 11
  • 42
  • 59
  • Can you please post the entire syntax of this code? It would really help me.. I am trying to download a lot of parquet files from s3,files = 's3://sbs/data/sco/alpha/2016/parquet/part-{00000 to 00178}-d5ac50e3.snappy.parquet' – Viv Jun 19 '17 at 12:31

4 Answers4

18

Yes, it's possible if you change method of specifying input to hadoop glob pattern, for example:

files = 's3a://dev/2017/01/{02,03}/data.parquet'
df = session.read.parquet(files)

You can read more on patterns in Hadoop javadoc.

But, in my opinion this isn't elegant way of working with data partitioned by time (by day in your case). If you are able to rename directories like this:

  • s3a://dev/2017/01/03/data.parquet --> s3a://dev/day=2017-01-03/data.parquet
  • s3a://dev/2017/01/02/data.parquet --> s3a://dev/day=2017-01-02/data.parquet

then you can take advantage of spark partitioning schema and read data by:

session.read.parquet('s3a://dev/') \
    .where(col('day').between('2017-01-02', '2017-01-03')

This way will omit empty/non-existing directories as well. Additionall column day will appear in your dataframe (it will be string in spark <2.1.0 and datetime in spark >= 2.1.0), so you will know in which directory each record exists.

vaer-k
  • 10,923
  • 11
  • 42
  • 59
Mariusz
  • 13,481
  • 3
  • 60
  • 64
  • 1
    It seems to me that the additional where clause will not reduce the load time. pyspark will still read the whole data first, right? – asmaier Sep 13 '17 at 13:39
  • No, spark will choose directories to read based on where clause. – Mariusz Sep 13 '17 at 13:40
  • Mmh, are you sure? In my tests that doesn't seem to be the case. I measured the execution time with and without where clause on some data of mine on S3 and the load time was basically the same. – asmaier Sep 13 '17 at 13:55
  • @asmaier Do you see in executor logs some lines indicating opening files, that do not match where clause directories? – Mariusz Sep 13 '17 at 16:51
  • It will read all the data and then it will put filter so not a good approach if you know already what folders to read. – Maneesh K Bishnoi Jun 30 '22 at 07:22
  • @ManeeshKBishnoi It's not true. This code reads only the metadata, not the data. Data reading runs on filtered sources. – Mariusz Jun 30 '22 at 10:29
  • @Mariusz just now I ran this and it took me several minutes to finish instead it was getting completed in seconds. From your code `session.read.parquet('s3a://dev/')` will be executed first as it will create dataframe and then filter will be applied on this df. Can you please explain where does s3 stored metadata? – Maneesh K Bishnoi Jun 30 '22 at 12:16
  • 1
    @ManeeshKBishnoi In case of parquet files, the metadata are stored in the file footer. Spark needs these to build the DF schema, that's why it reads the file footer. Usually it needs only one file and assumes others follow the same schema, unless you use `mergeSchema` option -> in this case it loads footers from all the files on this path. – Mariusz Jul 01 '22 at 13:51
  • What if your bucket consists of multiple parquets and you want to read them few at a time. How would you approach this? I should list all parquet files within the bucket and use method mentioned in the question? – haneulkim Nov 09 '22 at 08:30
  • @haneulkim Yes, in this case you can comma-separate these, as stated in the question. – Mariusz Nov 10 '22 at 09:11
1

Can I observe that as glob-pattern matching includes a full recursive tree-walk and pattern match of the paths, it is an absolute performance killer against object stores, especially S3. There's a special shortcut in spark to recognise when your path doesn't have any glob characters in, in which case it makes a more efficient choice.

Similarly, a very deep partitioning tree,as in that year/month/day layout, means many directories scanned, at a cost of hundreds of millis (or worse) per directory.

The layout suggested by Mariusz should be much more efficient, as it is a flatter directory tree —switching to it should have a bigger impact on performance on object stores than real filesystems.

stevel
  • 12,567
  • 1
  • 39
  • 50
0

A solution using union

files = ['s3a://dev/2017/01/03/data.parquet',
         's3a://dev/2017/01/02/data.parquet']

for i, file in enumerate(files):
    act_df = spark.read.parquet(file)   
    if i == 0:
        df = act_df
    else:
        df = df.union(act_df)

An advantage is that it can be done regardless any pattern.

Pavel Prochazka
  • 695
  • 8
  • 13
0
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

import boto3


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


inputDyf = lueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://dev-test-laxman-new-bucket/"]})

I am able to read multiple (2) parquet file from s3://dev-test-laxman-new-bucket/ and write in csv files. enter image description here

As you can see i have 2 parqet file in the my bucket :

Hope it will be helpful to others.

Gautam
  • 3,707
  • 5
  • 36
  • 57