4

I'm using Spark with Java connector to process my data.

One of the essential operations I need to do with the data is to count the number of records (row) within a data frame.

I tried df.count() but the execution time is extremely slow (30-40 seconds for 2-3M records).

Also, due to the system's requirement, I don't want to use df.rdd().countApprox() API because we need the exact count number.

Could somebody give me a suggestion of any alternatives that return exactly the same result as df.count() does, with faster execution time?

Highly appreciate your replies.

Gambit1614
  • 8,547
  • 1
  • 25
  • 51
tuancoltech
  • 115
  • 2
  • 11
  • 1
    I do not think there are any better alternatives. However, you could consider using `cache()` if you perform multiple actions on the dataframe, see https://stackoverflow.com/questions/45749580/same-set-of-tasks-are-repeated-in-multiple-stages-in-a-spark-job/45749730#45749730 – Shaido Aug 30 '17 at 06:35
  • Actually we mostly have only one count action, performed in different datasets we got from different queries. Thanks though, Shaido – tuancoltech Aug 30 '17 at 06:40
  • 1
    I believe the data is not actually loaded before you perform an action on it (not a transformation). Hence, the running time could partly be due to reading data from disk or database. – Shaido Aug 30 '17 at 06:45
  • To be specific, the data is being read from memory. We're using Spark SQL to read data from Aerospike set whose data is stored in memory only. The query itself is a transformation as far as I know, which is very fast. It's only extremely slow because of .count API which is an action. So just curious if we have a faster way to do that count thing. – tuancoltech Aug 30 '17 at 06:49
  • 1
    Did you try using `registerTempTable` and then doing a `select count(*)` or maybe `df.selectExpr("count(*)").show`? I'm guessing it would essentially be the same but you never know. :D – philantrovert Aug 30 '17 at 06:59
  • Yes, I called createOrReplaceTempView, which I believe is the same as registerTempTable, prior to query statement. I also tried both ways: select * from.. then .count , and select count(*) then extract the count result value but time is the same. – tuancoltech Aug 30 '17 at 07:02
  • 1
    Yes, the query itself appears as very fast as no data retrieval is actually done before the call to `count()`. The `count()` itself should be relatively fast, you could try it out by caching the data and calling count twice, the first time (with data retrieval) will probably be slow while the second one (only performing the actual `count()` operation) should be markedly faster. – Shaido Aug 30 '17 at 07:12
  • 1
    Yes it's faster but unfortunately it's not our case. Thanks though. – tuancoltech Aug 30 '17 at 07:18
  • 1
    If you're just counting the records in aerospike, does it provide that functionality for you, avoiding the need to load the data into spark and counting there? – puhlen Aug 31 '17 at 05:44
  • Hi puhlen, you are correct. Just today I found out that we can use Aerospike client to count records from Aerospike much faster than loading into Spark dataframe and count from there. Thanks a lot. – tuancoltech Aug 31 '17 at 06:22

4 Answers4

4
df.cache
df.count

It will be slow for the first time, since it caches during the execution of count for the first time, but in subsequent count will provide you good performance.

Leveraging df.cache depends on the use case.

maxmithun
  • 1,089
  • 9
  • 18
  • As far as I understand, your approach will be faster in case we call multiple actions on the same dataset, as pointed out by Shaido from comments above. However, we only have one action for each data set. After that we do a fresh query to get a new dataset, and call another count action on that. So I'm afraid the approach will not help quite much in our case. – tuancoltech Aug 30 '17 at 06:57
1

A simple way to check if a dataframe has rows, is to do a Try(df.head). If Success, then there's at least one row in the dataframe. If Failure, then the dataframe is empty. Here's a scala implementation of this.

Here is the reason why df.count() is a slow operation.

Sohum Sachdev
  • 1,397
  • 1
  • 11
  • 23
1

Count is very fast. You need to look to some of your other operations, the data loading and transformations you do to generate the Data frame that you are counting. That is the part slowing you down not the count itself.

If you can reduce the amount of data you load or cut out any transformations that don't affect the count you may be able to speed things up. If that's not an option you may be able to. Write your transformations more efficiently. Without knowing your transformations though it's not possible to say what the bottleneck might be.

puhlen
  • 8,400
  • 1
  • 16
  • 31
  • Thanks puhlen. Like I mentioned above, I were loading data into Spark dataframe to make further queries and count, which is unecessary. Now using aerospike client to do the job, and I think that is exactly what I'm looking for. – tuancoltech Aug 31 '17 at 06:23
0

I just found out that loading data into Spark data frame for further queries and count is unecessary.

Instead, we can use aerospike client to do the job and it's much faster than the above approach.

Here's the reference of how to use aerospike client http://www.aerospike.com/launchpad/query_multiple_filters.html

Thanks everyone

tuancoltech
  • 115
  • 2
  • 11