76

I have a hacky way of achieving this using boto3 (1.4.4), pyarrow (0.4.1) and pandas (0.20.3).

First, I can read a single parquet file locally like this:

import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()

I can also read a directory of parquet files locally like this:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()

Both work like a charm. Now I want to achieve the same remotely with files stored in a S3 bucket. I was hoping that something like this would work:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')

But it does not:

OSError: Passed non-file path: s3n://dsn/to/my/bucket

After reading pyarrow's documentation thoroughly, this does not seem possible at the moment. So I came out with the following solution:

Reading a single file from S3 and getting a pandas dataframe:

import io
import boto3
import pyarrow.parquet as pq

buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()

And here my hacky, not-so-optimized, solution to create a pandas dataframe from a S3 folder path:

import io
import boto3
import pandas as pd
import pyarrow.parquet as pq

bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
    buffer = io.BytesIO()
    s3.Object(bucket, key).download_fileobj(buffer)
    return buffer

client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)

Is there a better way to achieve this? Maybe some kind of connector for pandas using pyarrow? I would like to avoid using pyspark, but if there is no other solution, then I would take it.

Diego Mora Cespedes
  • 3,605
  • 5
  • 26
  • 33
  • 1
    Have you consider to read them with dask? I'm able to do the same in two lines. – rpanai Feb 15 '18 at 16:09
  • Got your hacky solution working only after filtering to files with size > 0: `dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers if len(buffer.getvalue()) > 0]`. Thanks! Unfortunately the answers here don't quite address this issue. – Wassadamo Apr 03 '21 at 23:18

9 Answers9

73

You should use the s3fs module as proposed by yjk21. However as result of calling ParquetDataset you'll get a pyarrow.parquet.ParquetDataset object. To get the Pandas DataFrame you'll rather want to apply .read_pandas().to_pandas() to it:

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()
vak
  • 1,694
  • 14
  • 18
  • got : ValueError: Found files in an intermediate directory . Any idea? – Mithril Nov 26 '18 at 09:14
  • @Mithril Your error message is unfortunatelly truncated, so I can't see the directory mentioned. However one could make a guess. Indeed, when the partitioned parquet files are stored to S3, they are usually first written to "_temporary" directory. If this directory not empty then it is a clear sign, that S3-location contains incomplete (broken) data. – vak Nov 27 '18 at 14:13
  • 3
    What happens when you do this? Does it stream or does it copied to local? If I open a 10GB file 2x does it download it 2x? What if I open 10GB file with only 5GB local storage, does it stream or download it all? – citynorman Jul 27 '19 at 03:28
  • 2
    When I specify the key where all my parquet files reside I get `ArrowIOError: Invalid Parquet file size is 0 bytes`. When I explicitly specify the parquet file, it works. i use s3fs == 0.3.5 and pyarrow == 0.15.0. @vak any idea why I cannot read all the parquet files in the s3 key like you did? – Vincent Claes Oct 12 '19 at 15:44
  • @VincentClaes trailing slash? – vak Oct 22 '19 at 12:10
  • hmm that gives me `OSError: Unrecognized filesystem: ` – Sonic Soul Mar 03 '20 at 15:56
  • I get `OSError: Passed non-file path: s3a://bucket/path/to/my/part_files.parquet/` – Wassadamo Dec 01 '20 at 19:13
39

Thanks! Your question actually tell me a lot. This is how I do it now with pandas (0.21.1), which will call pyarrow, and boto3 (1.3.1).

import boto3
import io
import pandas as pd

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, 
                                 s3_client=None, verbose=False, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    if s3_client is None:
        s3_client = boto3.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

Then you can read multiple parquets under a folder from S3 by

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')

(One can simplify this code a lot I guess.)

Louis Yang
  • 3,511
  • 1
  • 25
  • 24
15

It can be done using boto3 as well without the use of pyarrow

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())
oya163
  • 1,371
  • 2
  • 16
  • 20
13

Provided you have the right package setup

$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2

and your AWS shared config and credentials files configured appropriately

you can use pandas right away:

import pandas as pd

df = pd.read_parquet("s3://bucket/key.parquet")

In case of having multiple AWS profiles you may also need to set

$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible

so you can access your bucket.

ayorgo
  • 2,803
  • 2
  • 25
  • 35
12

Probably the easiest way to read parquet data on the cloud into dataframes is to use dask.dataframe in this way:

import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')

dask.dataframe can read from Google Cloud Storage, Amazon S3, Hadoop file system and more!

Rich Signell
  • 14,842
  • 4
  • 49
  • 77
  • The only problem with this solution is that you can't distribute on a cluster in case you need. – rpanai Dec 05 '18 at 14:19
  • My issue is that I get an error reading too many files. Could it be dask can't handle it and therefore need to resort to another solution? – msarafzadeh May 31 '20 at 13:17
  • I have 0-byte _SUCCESS header files in my directory of parquet files. Tried excluding them in the glob with: `dd.read_parquet('s3://bucket/test.parquet/[!_]*')` but resulted in `IndexError: list index out of range`. Either because no matches, or some 0-size files. – Wassadamo Apr 03 '21 at 23:16
  • `pip install dask[dataframe]` – jtlz2 May 31 '22 at 20:27
8

If you are open to also use AWS Data Wrangler.

import awswrangler as wr

df = wr.s3.read_parquet(path="s3://...")
Igor Tavares
  • 869
  • 11
  • 8
  • I dont know why , but it takes a lot of time to read the data ? like 6 sec for only 1.3 mb of dataset? – kanav anand Feb 18 '20 at 05:36
  • 6
    Your example will expect one parquet file. You need to set the parameter `dataset = True` to read a list of parquet files. – Vincent Claes Sep 12 '20 at 12:29
  • for some reason, I am getting the timeout error. df = wr.s3.read_parquet(path="s3://......../", path_suffix = ".snappy.parquet", dataset=True) "errorMessage": "................................ Task timed out after 3.14 seconds" any inputs... – NNM Sep 02 '22 at 18:28
4

You can use s3fs from dask which implements a filesystem interface for s3. Then you can use the filesystem argument of ParquetDataset like so:

import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)
yjk21
  • 49
  • 1
  • 2
1

Using pre-signed URLs

s3 =s3fs.S3FileSystem(key='your_key',secret='your_secret',client_kwargs={"endpoint_url":'your_end_point'})

df = dd.read_parquet(s3.url('your_bucket' + 'your_filepath',expires=3600,client_method='get_object'))
cottontail
  • 10,268
  • 18
  • 50
  • 51
0

I have tried the @oya163 solution and it works but after little bit change

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3',aws_access_key_id='123',aws_secret_access_key= '456')
object = s3.Object('bucket_name','myoutput.parquet')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())   
Learner
  • 3
  • 2
  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Jun 20 '22 at 22:42