10
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import findspark
from pyspark.sql.functions import countDistinct
spark = SparkSession.builder \
.master("local[*]") \
.appName("usres mobile related information analysis") \
.config("spark.submit.deployMode", "client") \
.config("spark.executor.memory","3g") \
.config("spark.driver.maxResultSize", "1g") \
.config("spark.executor.pyspark.memory","3g") \
.enableHiveSupport() \
.getOrCreate()

handset_info = ora_tmp.select('some_value','some_value','some_value','some_value','some_value','some_value','some_value')

I configure the spark with 3gb execution memory and 3gb execution pyspark memory.My Database has more than 70 Million row. Show i call the

 handset_info.show()

method it is showing the top 20 row in between 2-5 second. But when i try to run the following code

mobile_info_df = handset_info.limit(30)
mobile_info_df.show()

to show the top 30 rows the it takes too much time(3-4 hour). Is it logical to take that much time. Is there any problem in my configuration. Configuration of my laptop is-

  • Core i7(4 core) laptop with 8gb ram
Taimur Islam
  • 960
  • 2
  • 11
  • 25

3 Answers3

9

Spark copies the parameter you passed to limit() to each partition so, in your case, it tries to read 30 rows per partition. I guess you happened to have a huge number of partitions (which is not good in any case). Try df.coalesce(1).limit(30).show() and it should run as fast as df.show().

minhle_r7
  • 771
  • 9
  • 20
  • I am not sure. Spark is usually use for big data, which can easily having 1000 of partitions for parallelization. It is not that useful if you restrict it to single partition (which also means single core). In that case, you might just use pandas. – Louis Yang May 06 '21 at 03:05
  • But, the `.coalesce(1).limit(30)` does work much faster! Thanks! – Louis Yang May 06 '21 at 03:07
  • 1
    The `coalesce()` call only affect the planning of the subsequent calls; it doesn't mean your data is immediately put together into a big block ;-) – minhle_r7 May 07 '21 at 10:22
5

Your configuration is fine. This huge duration difference is caused by underlying implementation. The difference is that limit() reads all of the 70 million rows before it creates a dataframe with 30 rows. Show() in contrast just takes the first 20 rows of the existing dataframe and has therefore only to read this 20 rows. In case you are just interessted in showing 30 instead of 20 rows, you can call the show() method with 30 as parameter:

df.show(30, truncate=False)
cronoik
  • 15,434
  • 3
  • 40
  • 78
  • 2
    Do you know why does `limit()` works that way? It strikes me as rather wasteful... – minhle_r7 Nov 30 '20 at 17:22
  • What you said doesn't seem to be right, see: https://github.com/apache/spark/pull/15070 – minhle_r7 Nov 30 '20 at 17:53
  • @minhle_r7 I don't see how this PR is related to the OP's scenario. The OP creates two dataframes `mobile_info_df` and `handset_info`. The first one should only contain 30 rows and the other one is not limited. That means the whole data needs to be read (and the `show` triggers the reading). I can not test it right now because I don't have a spark environment at the moment, but `mobile_info_df = handset_info.limit(30)` and `handset_info = handset_info.limit(30)` will probably lead to a different execution plan. 1/2 – cronoik Nov 30 '20 at 19:18
  • @minhle_r7 The latter will have a much better performance similar to `show` (as I said probably! I have not tested it.). Maybe you can check it by yourself and edit my answer to improve it. 2/2 – cronoik Nov 30 '20 at 19:19
3

As you've already experienced, limit() with large data has just terrible performance. Wanted to share a workaround for anyone else with this problem. If the limit count doesn't have to be exact, use sort() or orderBy() to sort a column, and use filter() to grab top k% of the rows.

piritocle
  • 318
  • 2
  • 10
  • Could you please provide an example showing how to use `sort()` + `filter()` to grab top 10% rows? – EasonL Aug 27 '21 at 21:45
  • 1
    @EasonL So this wouldn't get you exactly x% because who knows how many rows meet the filter() condition, but you would do something like: `df.sort("score").filter("score > 0.9")` If you have some knowledge over your data distribution.. you could get close to your desired X% or number of rows... – piritocle Aug 29 '21 at 01:18