63

I looking for ways to read data from multiple partitioned directories from s3 using python.

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow's ParquetDataset module has the capabilty to read from partitions. So I have tried the following code :

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

It threw the following error :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

Based on documentation of pyarrow I tried using s3fs as the file system, ie :

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

Which throws the following error :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

I am limited to use a ECS cluster, hence spark/pyspark is not an option.

Is there a way we can easily read the parquet files easily, in python from such partitioned directories in s3 ? I feel that listing the all the directories and then reading the is not a good practise as suggested in this link. I would need to convert the read data to a pandas dataframe for further processing & hence prefer options related to fastparquet or pyarrow. I am open to other options in python as well.

krisfremen
  • 177
  • 1
  • 2
  • 13
stormfield
  • 1,696
  • 1
  • 14
  • 26
  • 1
    Let's discuss in https://issues.apache.org/jira/browse/ARROW-1213 and https://issues.apache.org/jira/browse/ARROW-1119. We must add some code to allow pyarrow to recognize the s3fs filesystem and add a shim / compatibility class to conform S3FS's slightly different filesystem API to pyarrow's. – Wes McKinney Jul 13 '17 at 14:25

5 Answers5

61

I managed to get this working with the latest release of fastparquet & s3fs. Below is the code for the same:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

credits to martin for pointing me in the right direction via our conversation

NB : This would be slower than using pyarrow, based on the benchmark . I will update my answer once s3fs support is implemented in pyarrow via ARROW-1213

I did quick benchmark on on indivdual iterations with pyarrow & list of files send as a glob to fastparquet. fastparquet is faster with s3fs vs pyarrow + my hackish code. But I reckon pyarrow +s3fs will be faster once implemented.

The code & benchmarks are below :

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

Update 2019

After all PRs, Issues such as Arrow-2038 & Fast Parquet - PR#182 have been resolved.

Read parquet files using Pyarrow

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas() 

Read parquet files using Fast parquet

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

Quick benchmarks

This is probably not the best way to benchmark it. please read the blog post for a through benchmark

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

Further reading regarding Pyarrow's speed

Reference :

stormfield
  • 1,696
  • 1
  • 14
  • 26
  • 1
    Thank you for the through analysis; now that ARROW-1213 is resolved, do you have any new benchmarks to share? Thanks. – Todor Minakov Apr 05 '18 at 08:22
  • I think here are few more bugs to be sorted out after ARROW-1213 is resolved. Please see https://issues.apache.org/jira/browse/ARROW-2038 . Meanwhile we can use something along the lines of what has been mentioned in https://github.com/apache/arrow/pull/916#issuecomment-337619158 if you need to use pyarrow – stormfield Nov 08 '18 at 01:50
  • 1
    @TodorMinakov I have updated the answer & benchmarks as well – stormfield Feb 16 '19 at 04:06
  • Does `timeit.timeit('test_fp',number =10,globals=globals())` actually call the `test_fp` function? Should that be `timeit.timeit('test_fp()',number=10,globals=globals())`? – Darren Weber May 15 '19 at 01:01
  • Hi @DarrenWeber good catch. Thank you! I probably should never code after 12 midnight again. I have updated the answer, with corrected code now. – stormfield May 16 '19 at 19:11
  • the combination of s3fs and pyarrow versions is crucial for making this work. for example, a combination that works is s3fs==0.3.5 and pyarrow>=0.14.0 – Vincent Claes Oct 08 '19 at 17:51
  • I am using pyarrrow 0.13 and s3fs-0.2.1...i am facing same issue.what is the minimum version for each to make it success. – Vikram Ranabhatt Aug 07 '20 at 20:54
  • Thanks! I am trying to iterate over parquet file and create a python generator where each element contain batch_size rows. Is there iter_batches() equivalent in pyarrow table created using pq.ParquetDataset()? – haneulkim Aug 08 '23 at 05:23
33

For python 3.6+ AWS has a library called aws-data-wrangler that helps with the integration between Pandas/S3/Parquet

to install do;

pip install awswrangler

to read partitioned parquet from s3 using awswrangler 1.x.x and above, do;

import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)

By setting dataset=True awswrangler expects partitioned parquet files. It will read all the individual parquet files from your partitions below the s3 key you specify in the path.

Vincent Claes
  • 3,960
  • 3
  • 44
  • 62
  • @Vincent_Claes Thank you for this. How do you specify that you only want a certain partition loaded? How do you apply that filter? Will the library do it for you? – rjurney Sep 08 '20 at 14:00
  • @rjurney awswrangler supports filtering on partitions. You can find some examples here: https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/023%20-%20Flexible%20Partitions%20Filter.ipynb – Vincent Claes Sep 08 '20 at 19:33
  • Thank you! This worked like a charm for me! – Mojgan Mazouchi Feb 23 '21 at 00:51
  • Where to set the keys , write data to private s3 bucket need keys. – 2015evanotes May 28 '21 at 05:54
  • 1
    @2015evanotes do you mean KMS keys? if so, this answer can help https://stackoverflow.com/a/59713720/1771155 – Vincent Claes May 29 '21 at 13:33
10

For those of you who want to read in only parts of a partitioned parquet file, pyarrow accepts a list of keys as well as just the partial directory path to read in all parts of the partition. This method is especially useful for organizations who have partitioned their parquet datasets in a meaningful like for example by year or country allowing users to specify which parts of the file they need. This will reduce costs in the long run as AWS charges per byte when reading in datasets.

# Read in user specified partitions of a partitioned parquet file 

import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()

keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']

bucket = 'bucket_yada_yada_yada'

# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
    parq_list.append('s3://'+bucket+'/'+key)

# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()
Statmonger
  • 415
  • 6
  • 14
5

This issue was resolved in this pull request in 2017.

For those who want to read parquet from S3 using only pyarrow, here is an example:

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"

# Python 3.6 or later
p_dataset = pq.ParquetDataset(
    f"s3://{bucket}/{path}",
    filesystem=fs
)
df = p_dataset.read().to_pandas()

# Pre-python 3.6
p_dataset = pq.ParquetDataset(
    "s3://{0}/{1}".format(bucket, path),
    filesystem=fs
)
df = p_dataset.read().to_pandas()
Asclepius
  • 57,944
  • 17
  • 167
  • 143
Eugene Brown
  • 4,032
  • 6
  • 33
  • 47
  • but i think there are few more issues to be sorted out for the same. Please see : https://issues.apache.org/jira/browse/ARROW-2038 – stormfield Nov 08 '18 at 01:46
  • I don't think this prohibits anyone from using the code I've written above to do what the questioner asked. How is that discussion directly related to reading parquet from S3 using the above method? – Eugene Brown Nov 08 '18 at 17:25
  • I didn't say your code doesn't work. I meant there are few more issues to be sorted out according to https://github.com/apache/arrow/pull/916#issuecomment-360541307 . From what i understand, I guess an edge case has been missed. So it might be better to use fastparquet over Arrow until ARROW-2038 is resolved. – stormfield Nov 08 '18 at 18:48
  • @efbbrown what s3fs and pyarrow version you tried for this fix – Vikram Ranabhatt Aug 07 '20 at 20:50
3

PyArrow 7.0.0 has some improvements to a new module, pyarrow.dataset, that is meant to abstract away the dataset concept from the previous, Parquet-specific pyarrow.parquet.ParquetDataset.

Assuming you are fine with the dataset schema being inferred from the first file, the example from the documentation for reading a partitioned dataset should just work.

Here's a more-complete example assuming you want to use data from S3:

import pyarrow.dataset as ds
from pyarrow import fs

s3 = fs.S3FileSystem()

dataset = ds.dataset(
    "my-bucket-name/my-path-to-dataset-partitions",
    format="parquet",
    filesystem=s3,
    partitioning="hive"
)

# Assuming your data is partitioned like year=2022/month=4/day=29
# this will only have to read the files for that day

expression = ((ds.field("year") == 2022) & (ds.field("month") == 4) & (ds.field("day") == 29))

pyarrow_table_2022_04_29 = dataset.to_table(filter=expression)

A word of warning if you're defining the dataset schema yourself. That inference above with the partition argument automatically adds the partitions to your dataset schema.

If you want the partitioning to work properly with a manually-defined dataset schema, you must make sure you add the partitions to the schema:

import pyarrow as pa

my_manual_schema = pa.schema([])  # Some pyarrow.Schema instance for your dataset

# Be sure to add the partitions even though they're not in the dataset files
my_manual_schema.append(pa.field("year", pa.int16()))
my_manual_schema.append(pa.field("month", pa.int8()))
my_manual_schema.append(pa.field("day", pa.int8()))

dataset = ds.dataset(
    "my-bucket-name/my-path-to-dataset-partitions",
    format="parquet",
    filesystem=s3,
    schema=my_manual_schema,
    partitioning="hive"
)
Nadir Sidi
  • 1,706
  • 2
  • 9
  • 12
  • hi Nadir, great answer, i was trying to use `pd.read_parquet` and was disappointed to discover it doesn't work as expected due to [this bug](https://github.com/pandas-dev/pandas/issues/36743). btw this is Brent who you used to work with -- i was very happy that i stumbled upon your answer! – lunguini Apr 27 '23 at 09:14