I use Apache Spark 2.2.0 and Scala.
I'm following the question as a guide to pivot a dataframe without using the pivot function.
I need to pivot the dataframe without using the pivot function as I have non-numerical data and pivot
works with an aggregation function like sum
, min
, max
on numerical data only. I've got a non-numerical column I'd like to use in pivot
aggregation.
Here's my data:
+---+-------------+----------+-------------+----------+-------+
|Qid| Question|AnswerText|ParticipantID|Assessment| GeoTag|
+---+-------------+----------+-------------+----------+-------+
| 1|Question1Text| Yes| abcde1| 0|(x1,y1)|
| 2|Question2Text| No| abcde1| 0|(x1,y1)|
| 3|Question3Text| 3| abcde1| 0|(x1,y1)|
| 1|Question1Text| No| abcde2| 0|(x2,y2)|
| 2|Question2Text| Yes| abcde2| 0|(x2,y2)|
+---+-------------+----------+-------------+----------+-------+
I want it to group by ParticipantID
, Assessment
and GeoTag
tags and "pivot" on Question
column and take the values from AnswerText
column. In the end, the output should look as follows:
+-------------+-----------+----------+-------+-----+----- +
|ParticipantID|Assessment |GeoTag |Qid_1 |Qid_2|Qid_3 |
+-------------+-----------+----------+-------+-----+------+
|abcde1 |0 |(x1,y1) |Yes |No |3 |
|abcde2 |0 |(x2,y2) |No |Yes |null |
+-------------+-----------+----------+-------+-----+------+
I have tried this:
val questions: Array[String] = df.select("Q_id")
.distinct()
.collect()
.map(_.getAs[String]("Q_id"))
.sortWith(_<_)
val df2: DataFrame = questions.foldLeft(df) {
case (data, question) => data.selectExpr("*", s"IF(Q_id = '$question', AnswerText, 0) AS $question")
}
[followed by a GroupBy expression]
But I'm getting the following error, which must be something to do with the syntax of the final statement AS $question
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: *
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
Any ideas where I am going wrong? Is there a better way? I thought about reverting to Pandas and Python outside Spark if necessary, but I'd rather write all the code within the same framework if possible.