0

I have a spark dataframe as follows:

+-----+------+
|A    | count|
+-----+------+
|dummy|23    |
|ABC  |157   |
|abc  |15    |
+-----+------+

I am trying to find the max value out of this column [157 in the example above] and this is what I have done:

max_value = df.agg({"count": "max"}).collect()[0][0]

I am new to spark programming. Although the solution above works, for large data [say a few million rows], I am unsure how efficient this solution is going to be [as it involves a reduction component]. Are there more efficient solutions available to get the max value out of a column?

PS: I have gone through many solutions on the internet [such as: https://stackoverflow.com/questions/33224740/best-way-to-get-the-max-value-in-a-spark-dataframe-column] and have not come across one dealing with the performance.

EDIT 1: The dataframe I am dealing with has multiple columns of large data.

EDIT 2: There are the transformations being performed on the data before the max value is to be fetched:

a) I get my input data from Google Cloud Platform (in Parquet).

b) This data is converted into a pyspark dataframe.

c) I then add a "count" column to this dataframe.

d) Then, from the "count" column, I would like to fetch the max value.

mang4521
  • 742
  • 6
  • 22
  • 1
    The code is fine (although can simplified). Much more important is the way you store the data. Hopefully you are using Delta or at least Parquet. – David דודו Markovitz Mar 11 '22 at 16:02
  • Yes all the data is in Parquet format. I tested the code with a large dataset. The piece of code to get the max value is relatively time consuming [but is still isn't beyond the threshold]. I therefore wanted to confirm if there are more efficient ways to perform the same. – mang4521 Mar 11 '22 at 16:09
  • Could you share some numbers? Data volume? Number of files? Execution time? – David דודו Markovitz Mar 11 '22 at 16:10
  • Definitely. I shall share some data around this as and when I have it. – mang4521 Mar 11 '22 at 16:25
  • @DavidדודוMarkovitz I am dealing with close to 1111601 rows and the time the code above took to get the max value is ~5min [296 seconds to be exact]. – mang4521 Mar 13 '22 at 09:08
  • @DavidדודוMarkovitz i do see this warning as soon as the execution to calculate the max value starts: *WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.* – mang4521 Mar 13 '22 at 09:20
  • Nothing here adds up. This count should take a second. Do you really have a single column in your data frame? Is it really an integer? Are the files really stored s Parquet? What Spark version are you using? – David דודו Markovitz Mar 13 '22 at 11:51
  • @DavidדודוMarkovitz No, the dataframe does not have a single column [has multiple columns], but I would like to work only with the "count" column. Yes, the "count" column is being generated by me and is of type integer. Yes, I am fetching the data from GCP and it is in parquet format. I am on Spark 2.4.8 version. I do suspect there being other columns is causing this issue in latency. – mang4521 Mar 13 '22 at 13:14
  • What do you mean by "fetching the data from GCP"? Parquet is a columnar format, which means that if you query a single column, only the data of this column is being read. – David דודו Markovitz Mar 13 '22 at 17:56
  • I have a hunch, that your slowness is because of the `collect` operation. Can you check your driver logs and see the GC or even on the Ganglia UI ? – Dipanjan Mallick Mar 13 '22 at 18:06
  • @DavidדודוMarkovitz The process is pretty straightforward. I get my input data from Google Cloud Platform (in Parquet). This data is converted into a pyspark dataframe. I then add a "count" column to this dataframe. Then, from the "count" column, I would like to fetch the max value. This **last step alone** is taking a lot of time. All the other steps are pretty quick. – mang4521 Mar 14 '22 at 05:38
  • @DKNY as far as my understanding goes, collect() will collate the distributed data into the local system [the master in this case] and therefore is definitely a time consuming process. This being said, it should still not take as much time as it is taking for me [given the size of the data]. Should it? – mang4521 Mar 14 '22 at 05:38
  • @mang4521 as per my understanding you are a taking only first max value from column hence you can used take(1) instead of collect. – Mahesh Gupta Mar 14 '22 at 06:34
  • 1
    It is quite clear now. As I understand `count` is not taken from the data files. It is a computed column (e.g. count the strings' length in another column that might hold 100GB of data). FYI, spark in lazy. It doesn't start the data processing until the moment it requires to show some results (collect / take / show / save etc.) . " All the other steps are pretty quick" because it didn't do anything. The `collect()` operation in your case has no impact on the performance, since you only collect a single value, **but** this is the action that triggers the actual data processing. – David דודו Markovitz Mar 14 '22 at 06:39
  • Having said that, it means you've missed all the "good" stuff from your question, which is the transformations you are doing in order to get `count` – David דודו Markovitz Mar 14 '22 at 06:51
  • @DavidדודוMarkovitz I am new to spark programming, but what I understand from your analysis is the **lazy loading** aspect of spark [similar to let's say Spring]. Am I correct in assuming this? If this is the case, then is the latency shared above acceptable? – mang4521 Mar 14 '22 at 07:27
  • I have no idea what would be a reasonable execution time since I have no idea what are you doing in order to compute `count`. – David דודו Markovitz Mar 14 '22 at 07:32
  • @DavidדודוMarkovitz, Yes, I do get your point. What I am asking of you here is whether "max" function is a time consuming process provided there are multiple transformations made prior and the data itself is huge? – mang4521 Mar 14 '22 at 07:50
  • Max operation for a few million records is no sweat. You should get the result in seconds. You can off course prove it by first persisting the dataframe and only then calculating the max. – David דודו Markovitz Mar 14 '22 at 08:10
  • @DavidדודוMarkovitz A quick question here. Could you elaborate on this: "persisting the dataframe"? What do you mean by persisting the dataframe here? And as usual thank you for your guidance. – mang4521 Mar 14 '22 at 09:59
  • 1
    See https://sparkbyexamples.com/spark/spark-persistence-storage-levels/. You can store the dataframe to cache and/or disk and then run some action on it (e.g. `count()`) before you run your final aggregation (e.g. `df.persist().count()`). This will trigger the transformations chain (according to what you say it's going to take ~5 minutes). after that you can run the aggregation and see the actual time that it takes. Alternatively, you can store the dataframe (df.write) and than read it and do the aggregation. – David דודו Markovitz Mar 14 '22 at 11:56
  • @DavidדודוMarkovitz Thanks a ton!! Let me go through all these techniques to better understand the methodology and in turn improve the performance. – mang4521 Mar 14 '22 at 14:58
  • @DavidדודוMarkovitz I did give your suggestion a try and I worked perfectly well. I now have bee able to reduce the time taken get the max value to 0.7s. Due to lazy loading it is indeed true that the count() transformation takes time, but all the post this have quick execution times. – mang4521 Mar 16 '22 at 13:35

3 Answers3

2

-- Not an answer --

A quick POC for @BennyElgazar.
Executed on Azure Databricks using a single VM cluster, with 4 cores.

import pyspark.sql.functions as F

df = spark.range(1000000000).orderBy(F.rand())

df.agg({"id":"max"}).first()[0]

Out[34]: 999999999 Command took 0.64 seconds

df.sort(F.col("id").desc()).first().asDict()['id']

Out[35]: 999999999 Command took 55.38 seconds


P.S.
There is no need for asDict()

Any of the following works just fine:

df.sort(F.col("id").desc()).first()[0]
df.sort(F.col("id").desc()).first().id
df.sort(F.col("id").desc()).first()['id']
David דודו Markovitz
  • 42,900
  • 6
  • 64
  • 88
  • My tests did indicate this. There wasn't that big a difference between the two. Although, the sort solution took more time. – mang4521 Mar 14 '22 at 10:01
0

As suggested by @DavidדודוMarkovitz above, the best possible solution to reduce execution time is to make use of spark caching or persistence mechanisms. This is elaborated below:

  • Store the dataframe to cache and/or to disk.
  • Run a simple dataframe action [such as count()] on this dataframe.
  • Post this, run the actual [in this case, get the max value] transformation.

The intermediary step [count()] would take some time but post this [due to the lazy loading nature of spark], all the other transformations on the dataframe would take minimal time to complete.

mang4521
  • 742
  • 6
  • 22
-2

What is the motivation for aggregation and not sorting ? If your interest is only in the max count number which you already have according to your example.

so why not just DF.sort(f.col("count").desc()).first().asDict()['count'] ?

Benny Elgazar
  • 243
  • 2
  • 9
  • I did give this approach a try. My earlier approach used to take around 295s [as already mentioned above]. This took me close to 315s. Converting this into a dictionary is basically collating the distributed data into the local system, is it not? – mang4521 Mar 14 '22 at 05:41
  • 1
    You seriously suggest that a sort operation is cheaper than a max operation? – David דודו Markovitz Mar 14 '22 at 06:42
  • @DavidדודוMarkovitz of course. in some scenarios its works better, for example, when you have mid-level datasets, with no fat columns. When It works I get x3 to X10 boost in performance. – Benny Elgazar Mar 14 '22 at 08:06
  • No it doesn't. sort is an O(n*log(n)) operation while max is an O(n) operation. Check the POC I have just posted. – David דודו Markovitz Mar 14 '22 at 08:36
  • @DavidדודוMarkovitz sort is indeed O(n*log(n)) but I found aggregating between partitions and move data between executors, sometimes more expensive. especially when you have fat columns that you need to consider regardless of the operation. – Benny Elgazar Mar 14 '22 at 08:45
  • (1) Using max without groupBy doesn't yield shuffling of the raw data. It uses partial_max (max per partition) followed by as finalmerge_max (merging all partial results to a single place and finding their max) (2) I would love to see the scenario you have described – David דודו Markovitz Mar 14 '22 at 09:03