2

I have a pyspark dataframe, named df. I want to know if his columns contains NA's, I don't care if it is just one row or all of them. The problem is, my current way to know if there are NA's, is this one:

from pyspark.sql import functions as F

if (df.where(F.isnull('column_name')).count() >= 1):
    print("There are nulls")
else:
    print("Yey! No nulls")

The issue I see here, is that I need to compute the number of nulls in the whole column, and that is a huge amount of time wasted, because I want the process to stop when it finds the first null.

I thought about this solution but I am not sure it works (because I work in a cluster with a lot of other people so the execution time depends on the multiple jobs other people run in the cluster, so I can't compare the two approaches in even conditions):

(df.where(F.isnull('column_name')).limit(1).count() == 1)

Does adding the limit help ? Are there more efficient ways to achieve this ?

Manrique
  • 2,083
  • 3
  • 15
  • 38

1 Answers1

1

There is no non-exhaustive search for something that isn't there.

We can probably squeeze a lot more performance out of your query for the case where a record with a null value exists (see below), but what about when it doesn't? If you're planning on running this query multiple times, with the answer changing each time, you should be aware (I don't mean to imply that you aren't) that if the answer is "there are no null values in the entire dataframe", then you will have to scan the entire dataframe to know this, and there isn't a fast way to do that. If you need this kind of information frequently and the answer can frequently be "no", you'll almost certainly want to persist this kind of information somewhere, and update it whenever you insert a record that might have null values by checking just that record.

Don't use count().

count() is probably making things worse.

  • In the count case Spark used wide transformation and actually applies LocalLimit on each partition and shuffles partial results to perform GlobalLimit.
  • In the take case Spark used narrow transformation and evaluated LocalLimit only on the first partition.

In other words, .limit(1).count() is likely to select one example from each partition of your dataset, before selecting one example from that list of examples. Your intent is to abort as soon as a single example is found, but unfortunately, count() doesn't seem smart enough to achieve that on its own.

As alluded to by the same example, though, you can use take(), first(), or head() to achieve the use case you want. This will more effectively limit the number of partitions that are examined:

If no shuffle is required (no aggregations, joins, or sorts), these operations will be optimized to inspect enough partitions to satisfy the operation - likely a much smaller subset of the overall partitions of the dataset.

Please note, count() can be more performant in other cases. As the other SO question rightly pointed out,

neither guarantees better performance in general.

There may be more you can do.

Depending on your storage method and schema, you might be able to squeeze more performance out of your query.

  • Since you aren't even interested in the value of the row that was chosen in this case, you can throw a select(F.lit(True)) between your isnull and your take. This should in theory reduce the amount of information the workers in the cluster need to transfer. This is unlikely to matter if you have only a few columns of simple types, but if you have complex data structures, this can help and is very unlikely to hurt.
  • If you know how your data is partitioned and you know which partition(s) you're interested in or have a very good guess about which partition(s) (if any) are likely to contain null values, you should definitely filter your dataframe by that partition to speed up your query.
Jesse Amano
  • 800
  • 5
  • 16