0

What I'm trying to achieve is, for the following DataFrame:

-------------------------
| FOO   | BAR   | BAZ   |
| lorem | ipsum | dolor |
| sit   | amet  | dolor |
| lorem | lorem | dolor |
-------------------------

Generate the following output:

Map(
 FOO -> List("lorem", "sit"),
 BAR -> List("ipsum", "amet", "lorem"),
 BAZ -> List("dolor")
)

This is the Scala code that I've come up with:

val df = data.distinct

df.columns.map((key) => {
  val distinctValues = df
    .select(col(key))
    .collect
    .map(df => df.getString(0))
    .toList
    .distinct
  (key, distinctValues)
}).toMap

I have tried a close alternative to this code using RDDs, and somehow they're about 30% faster, but the problem remains the same: this all is extraordinarily inefficient.

I am running Spark locally against a local Cassandra hosting a sample dataset of only 1000 rows, but these operations generate tons and tons of logs and take more then 7 seconds to complete.

Am I doing something wrong, is there a better way of doing this?

Bertrand
  • 1,718
  • 2
  • 13
  • 24
  • 4
    `df.select(df.columns map (c => collect_set(c) as c): _*).first.getValuesMap[Seq[String]](df.columns)` would be a small improvement, but overall idea is just not scalable and second latency in Spark on generic cluster is usually expected. – zero323 Dec 17 '18 at 10:38
  • I think this could be a duplicate question - https://stackoverflow.com/questions/37949494/how-to-count-occurrences-of-each-distinct-value-in-a-column – Leothorn Dec 17 '18 at 11:34
  • 1
    @user6910411 that indeed improved the performance a lot, thanks! Could you elaborate on why this is not scalable? Because the output can get too big? – Bertrand Dec 18 '18 at 02:10
  • 1
    Well. You collect all the unique values, and convert to local structure so it can work if and only if the final result is small enough to be handled in memory by each node (driver and executor). Could work if you apriori know that cardinality is small, but let's say someone puts a random real numbers there :) If you are willing to relax your requirements you can [melt](https://stackoverflow.com/q/41670103/6910411) the frame and then just take distinct value keeping things distributed. – zero323 Dec 18 '18 at 22:05

2 Answers2

0

You have a function called, collect_set

df.select(collect_set($"FOO"), collect_set($"BAR"), collect_set($"BAZ"))
Mohd Avais
  • 237
  • 2
  • 9
-1

To identify unique values of a column

for (x<- df.columns)
{
df.select(x).groupBy(x).count
}

I think the use of approx distinct would make this much faster.

import org.apache.spark.sql.functions.approx_count_distinct
df.agg(approx_count_distinct("some_column"))

This is a nice post by zero323 explaining this >>

How to count occurrences of each distinct value for every column in a dataframe?

Leothorn
  • 1,345
  • 1
  • 23
  • 45
  • 1
    I think you may have misunderstood my question, as I'm not trying to count distinct values, but to get all those distinct values into a sequence. – Bertrand Dec 18 '18 at 02:04
  • You want to speed up the process - one way is the increase speed of distinct itself by replacing it with approx distinct thats what i am alluding to – Leothorn Dec 18 '18 at 03:16
  • 1
    @Leothorn But that will give counts, while Bertrand wants unique values. – zero323 Dec 18 '18 at 22:06