-2

Spark version 1.60, Scala version 2.10.5.

I have a spark-sql dataframe df like this,

+-------------------------------------------------+
|addess         | attributes                      | 
+-------------------------------------------------+
|1314 44 Avenue |   Tours, Mechanics, Shopping    |                                                                                                                                 
|115 25th Ave   |   Restaurant, Mechanics, Brewery|                                                                 
+-------------------------------------------------+

From this dataframe, I would like values as below,

Tours, Mechanics, Shopping, Brewery

If I do this,

df.select(df("attributes")).collect().foreach(println)

I get,

[Tours, Mechanics, Shopping]
[Restaurant, Mechanics, Brewery]

I thought I could use flatMapinstead found this, so, tried to put this into a variable using,

val allValues = df.withColumn(df("attributes"), explode("attributes"))

but I am getting an error:

error: type mismatch;

found:org.apache.spark.sql.column

required:string

I was thinking if I can get an output using explode I can use distinct to get the unique values after flattening them.

How can I get the desired output?

Community
  • 1
  • 1
user9431057
  • 1,203
  • 1
  • 14
  • 28
  • Anyone downvoting, please give your reasoning (especially for newcomers). That gives a guidance to correct ourselves and are encouraged to learn. Then it makes everyone visible what is wrong there. No offense, we all learn together, and learn from mistakes. (there is documentation to ask a good questions, but sometimes, its just difficult and need a little push :) ) – user9431057 Nov 25 '18 at 16:39

2 Answers2

2

I strongly recommend you to use spark 2.x version. In Cloudera, when you issue "spark-shell", it launches 1.6.x version.. however, if you issue "spark2-shell", you get the 2.x shell. Check with your admin

But if you need with Spark 1.6 and rdd solution, try this.

import spark.implicits._
import scala.collection.mutable._
val df = Seq(("1314 44 Avenue",Array("Tours", "Mechanics", "Shopping")),
              ("115 25th Ave",Array("Restaurant", "Mechanics", "Brewery"))).toDF("address","attributes")
df.rdd.flatMap( x => x.getAs[mutable.WrappedArray[String]]("attributes") ).distinct().collect.foreach(println)

Results:

Brewery
Shopping
Mechanics
Restaurant
Tours

If the "attribute" column is not an array, but comma separated string, then use the below one which gives you same results

val df = Seq(("1314 44 Avenue","Tours,Mechanics,Shopping"),
  ("115 25th Ave","Restaurant,Mechanics,Brewery")).toDF("address","attributes")
df.rdd.flatMap( x => x.getAs[String]("attributes").split(",") ).distinct().collect.foreach(println)
stack0114106
  • 8,534
  • 3
  • 13
  • 38
1

The problem is that withColumn expects a String in its first argument (which is the name of the added column), but you're passing it a Column here df.withColumn(df("attributes").
You only need to pass "attributes" as a String.

Additionally, you need to pass a Column to the explode function, but you're passing a String - to make it a column you can use df("columName") or the Scala shorthand $ syntax, $"columnName".

Hope this example can help you.

import org.apache.spark.sql.functions._
val allValues = df.select(explode($"attributes").as("attributes")).distinct

Note that this will only preserve the attributes Column, since you want the distinct elements on that one.

  • 1
    Hi @user9431057, I updated the answer, hope it helps now. Also I would suggest you to search for an scala introduction tutorial, there are plenty of them on the internet - many of them focused especially for spark newcomers. – Luis Miguel Mejía Suárez Nov 25 '18 at 01:57
  • @user9431057 which **Spark** & **Scala** versions are you using? – Luis Miguel Mejía Suárez Nov 25 '18 at 02:08
  • I am using **Spark** `1.60`, **scala** `version 2.10.5` – user9431057 Nov 25 '18 at 02:10
  • 1
    @user9431057 ah, that explains it. The problem is that `array_distinct` is a new function added in spark `2.4.0`. But also I noticed that it wouldn't solve your problem since you need the unique values over the entire column, not just in each array. I will update the answer. – Luis Miguel Mejía Suárez Nov 25 '18 at 02:16
  • Sorry still getting an error `cannot resolve explode ("attributes") due to data type mismatch. input to function explode should be array or map type, not StringType` – user9431057 Nov 25 '18 at 02:32
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/184187/discussion-between-luis-miguel-mejia-suarez-and-user9431057). – Luis Miguel Mejía Suárez Nov 25 '18 at 02:33