0

I've noticed an issue while trying to apply filters on Pyarrow datasets initialised from delta tables. Specifically, the is_null expression predicate only seems to return rows if all the rows in the particular partition/parquet file have null values for the filtered column. I'm using the delta-rs library to create the Pyarrow dataset from the Delta table so I'm unsure if this is a bug in the Pyarrow library or the delta-rs library or if it is even a bug and I'm using the wrong operation for my use-case.

I've created a minimal example using the following data written to a delta table using Pyspark:

import pyspark
from delta import configure_spark_with_delta_pip
from pyspark.sql.types import StructType, StructField, StringType

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

data = [['a', None],
        ['a', 'exists'],
        ['b', None],
        ['b', None]]

schema = StructType([
    StructField("key", StringType()),
    StructField('null_column', StringType())
])

spark_df = spark.createDataFrame(data=data, schema=schema)
spark_df.repartition(2, "key").write.format("delta").mode("overwrite").save("./table")

This results in the following entry in the delta_log (included to show that the log correctly identifies all 3 null entries):

{"commitInfo":{"timestamp":1687784715118,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"4","numOutputBytes":"1436"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"d718e2b3-c062-4783-987f-97cab1c5f110"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"eec58790-7456-4bfb-9fd5-6a576e46851b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"null_column\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1687784710499}}
{"add":{"path":"part-00000-ada91cf1-2cd6-4928-af3b-0b1bcc987bfc-c000.snappy.parquet","partitionValues":{},"size":745,"modificationTime":1687784712864,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":\"a\",\"null_column\":\"exists\"},\"maxValues\":{\"key\":\"a\",\"null_column\":\"exists\"},\"nullCount\":{\"key\":0,\"null_column\":1}}"}}
{"add":{"path":"part-00001-85bbf245-a29d-40bc-be96-86391569710c-c000.snappy.parquet","partitionValues":{},"size":691,"modificationTime":1687784712854,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":\"b\"},\"maxValues\":{\"key\":\"b\"},\"nullCount\":{\"key\":0,\"null_column\":2}}"}}

Now, in a separate file, I ran the following code using the delta-rs library to try to extract all rows that have null values as the entry for null_column:

from deltalake import DeltaTable
import pyarrow.compute as pc

ds = DeltaTable("./table").to_pyarrow_dataset()
pat = ds.to_table(filter=(pc.field("null_column").is_null()))
print(pat.num_rows)
print(pat.to_pandas().head())

I expected the result to have 3 rows (2 with key 'b' and 1 with key 'a') but it only has the 2 rows with key 'b'. Any inputs on whether this is a bug or the expected behaviour would be great. Additionally, if this is indeed the expected behaviour, I'd really appreciate some help on how to get what I need i.e. all 3 rows. Thanks!

  • 1
    My (limited) understanding is that the bug happens here https://github.com/delta-io/delta-rs/blob/e5dd8e2167b94e6856aa531d878584397d5bea69/python/deltalake/table.py#L493 When generating fragments, delta-rs thinks that the data is partitioned in two files based on `null_column=None`. You can see it by calling `list(ds.get_framents()`. If you load the dataset as a plain arrow dataset, it works as expected `pa.dataset.dataset("./table")`. You may want to raise an issue with delta-rs. – 0x26res Jun 26 '23 at 18:07
  • Hi, I don't think I completely understand your reasoning behind the cause. I did see `partition=[null_column=None]` on calling `list(ds.get_fragments())` but on changing the table to only have the first 2 rows of the above data (resulting in there being only one parquet file), the partition part is no longer mentioned but the bug remains - 0 rows are returned. However, you are right about `pa.dataset.dataset("./table")` working as expected so I suppose the bug must be somewhere in `to_pyarrow_dataset()` . Do you have any idea if there are any issues with using the same to load Delta tables? – Rohith Paul Jun 27 '23 at 05:48

0 Answers0