1

I have a spark scala dataframe and need to filter the elements based on condition and select the count.

  val filter = df.groupBy("user").count().alias("cnt")
  val **count** = filter.filter(col("user") === ("subscriber").select("cnt")

The error i am facing is value select is not a member of org.apache.spark.sql.Column Also for some reasons count is Dataset[Row] Any thoughts to get the count in a single line?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
sparkscala
  • 91
  • 1
  • 11

2 Answers2

2

DataSet[Row] is DataFrame

RDD[Row] is DataFrame so no need to worry.. its dataframe

see this for better understanding... Difference between DataFrame, Dataset, and RDD in Spark

Regarding select is not a member of org.apache.spark.sql.Column its purely compile error.

 val filter = df.groupBy("user").count().alias("cnt")
  val count = filter.filter (col("user") === ("subscriber"))
    .select("cnt")

will work since you are missing ) braces which is closing brace for filter.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • if you are okay please care to accept [the answer as owner](https://meta.stackexchange.com/a/5235/369717) and [vote-up](https://meta.stackexchange.com/a/173400/369717) – Ram Ghadiyaram Apr 29 '20 at 16:27
1

You are missing ")" before .select, Please check below code.

Column class don't have .select method, you have to invoke select on Dataframe.

val filter = df.groupBy("user").count().alias("cnt")
  val **count** = filter.filter(col("user") === "subscriber").select("cnt")

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Still i am getting error as type mismatch; found : org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] required: Int – sparkscala Apr 24 '20 at 16:10
  • +--------+-------+ | user | count| +--------+-------+ |subscriber|267| +--------+-------+ Filter dataframe is giving the output and this is the input for variable count and val counts is Dataset[Row] – sparkscala Apr 24 '20 at 16:17
  • For example if we have one more statement like val **count** = filter.filter(col("user") === "Consumer").select("cnt") and if we don't have data for Consumer in filter dataframe. The spark job will fail. How to handle this scenario? – sparkscala Apr 24 '20 at 17:17
  • No, it will not fail. It will simply return empty dataframe. – Srinivas Apr 24 '20 at 17:22
  • i am trying to convert to rdd and writing to variable as below val count = filter.filter(col("user") === "Consumer").select("cnt").rdd.map(x => x(0)).collect()(0). So my job is failing since there is no data. Any suggestions? – sparkscala Apr 24 '20 at 17:27
  • why are you converting to RDD, can you post your full code – Srinivas Apr 24 '20 at 17:40
  • If i want that count then i have convert to rdd and right? val Consumercount = filter.filter(col("user") === "Consumer").select("cnt").rdd.map(x => x(0)).collect()(0) val subscribercount = filter.filter(col("user") === "subscriber").select("cnt") .rdd.map(x => x(0)).collect()(0) . Like this i have 10 counts and all 10 have to write to 10 variables. – sparkscala Apr 24 '20 at 17:54
  • there are multiple ways to do that. you can check this - val Consumercount = filter.filter(col("user") === "Consumer").select("cnt").map(_.getAs[Int](0)).collect val subscribercount = filter.filter(col("user") === "subscriber").select("cnt").map(_.getAs[Int](0)).collect You will get result in array, then you can process – Srinivas Apr 24 '20 at 17:58
  • For some reason it's not accepting map(.getAs[Int](0)).collect and it's giving error as ◾Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. ◾not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.Dataset[U]. Unspecified value parameter evidence$6. – sparkscala Apr 24 '20 at 21:48