0

I have what seems like a simple question, but I cannot figure it out. I am trying to filter to a specific row, based on an id (primary key) column, because I want to spot-check it against the same id in another table where a transform has been applied.

More detail... I have a dataframe like this:

|  id  | name  | age  |
| 1112 | Bob   | 54   |
| 1123 | Sue   | 23   |
| 1234 | Jim   | 37   |
| 1251 | Mel   | 58   | 
...

except it has ~3000MM rows and ~2k columns. The obvious answer is something like df.filter('id = 1234').show(). The problem is that I have ~300MM rows and this query takes forever (as in 10-20 minutes on a ~20 node AWS EMR cluster).

I understand that it has to do table scan, but fundamentally I don't understand why something like df.filter('age > 50').show() finishes in ~30 seconds and the id query takes so long. Don't they both have to do the same scan?

Any insight is very welcome. I am using pyspark 2.4.0 on linux.

seth127
  • 2,594
  • 5
  • 30
  • 43

1 Answers1

1

Don't they both have to do the same scan?

That depends on the data distribution.

First of all show takes only as little data as possible, so as long there is enough data to collect 20 rows (defualt value) it can process as little as a single partition, using LIMIT logic (you can check Spark count vs take and length for a detailed description of LIMIT behavior).

If 1234 was on the first partition and you've explicitly set limit to 1

df.filter('id = 1234').show(1)

the time would be comparable to the other example.

But if limit is smaller than number of values that satisfy the predicate, or values of interest reside in the further partitions, Spark will have to scan all data.

If you want to make it work faster you'll need data bucketed (on disk) or partitioned (in memory) using field of interest, or use one of the proprietary extensions (like Databricks indexing) or specialized storage (like unfortunately inactive, succint).

But really, if you need fast lookups, use a proper database - this what they are designed for.

  • Would doing `df = df.repartition(df.rdd.getNumPartitions(), "id")` before the lookup help here? – pault Mar 20 '19 at 16:33
  • Thanks for the response. I had the same question as @pault. Someone else told me something like "you have to repartition it on `id` and rebuild the table with the new partitions..." Is that right or can you do something like pault suggests on the dataframe object? – seth127 Mar 20 '19 at 18:59
  • I believe pault's suggestion will decrease the amount of time the lookup will take, but increase the processing that must occur prior to the lookup. Depending on how parallelizable the operation is, there is a chance it will improve net query performance but I am not certain of that; both the repartition and the lookup are highly parallelizable. – Peter Dowdy May 13 '21 at 18:41