5

I have following DataFrame:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1        |      d1       |
    |     1        |     v2        |      d2       |
    |     2        |     v21       |      d21      |
    |     2        |     v22       |      d22      |
    |--------------|---------------|---------------|

I want to transform it into:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1;v2     |      d1;d2    |
    |     2        |     v21;v22   |      d21;d22  |
    |--------------|---------------|---------------|
  • Is it possible through data frame operations?
  • How would rdd transformation look like in this case?

I presume rdd.reduce is the key, but I have no idea how to adapt it to this scenario.

Silverrose
  • 160
  • 4
  • 18
  • You want `value` column in result to be `StringType` or `ArrayType` column? – Odomontois Dec 08 '15 at 09:01
  • In Spark < 1.6 you can use an UDAF: [SPARK SQL replacement for mysql GROUP_CONCAT aggregate function](http://stackoverflow.com/a/32750733/1560062). – zero323 Dec 08 '15 at 10:51

4 Answers4

8

You can transform your data using spark sql

case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
  .map(line => Test(line._1, line._2, line._3))
  .df

data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
Kaushal
  • 3,237
  • 3
  • 29
  • 48
  • 1
    Interesting. I see [`collect_list` marked as `@since 1.6.0`](https://github.com/nburoojy/spark/blob/07de8a2f65b205b0d157301e097beb4950448cf0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L185) – Odomontois Dec 08 '15 at 10:10
  • 2
    Weird, I am using Spark 1.6.1! When I am doing the same it is saying : undefined function collect_list; I also added the functions._ import – Murtaza Kanchwala Jun 06 '16 at 12:07
  • Are you using **collect_list** function inside sql query or with dataframe? – Kaushal Jun 06 '16 at 12:17
1

Suppose you have something like

import scala.util.Random

val sqlc: SQLContext = ???

case class Record(id: Long, value: String, desc: String)

val testData = for {
    (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
  } yield Record(i, s"v$i$j", s"d$i$j")

val df = sqlc.createDataFrame(testData)

You can easily join data as:

import sqlc.implicits._

def aggConcat(col: String) = df
      .map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
      .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)

val result = aggConcat("value").zip(aggConcat("desc")).map{
      case ((id, value), (_, desc)) => (id, value, desc)
    }.toDF("id", "values", "descs") 

If you would like to have concatenated strings instead of arrays, you can run later

import org.apache.spark.sql.functions._

val resultConcat =  result
      .withColumn("values", concat_ws(";", $"values"))
      .withColumn("descs" , concat_ws(";", $"descs" ))
Odomontois
  • 15,918
  • 2
  • 36
  • 71
1

If working with DataFrames, use UDAF

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}

class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction {
  def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil)
  def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

  private def concatStrings(str1: String, str2: String): String = {
   (str1, str2) match {
      case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep)
      case (null, s: String) => s
      case (s: String, null) => s
      case _ => ""
    }
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val acc1 = buffer.getAs[String](0)
    val acc2 = input.getAs[String](0)
    buffer(0) = concatStrings(acc1, acc2)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val acc1 = buffer1.getAs[String](0)
    val acc2 = buffer2.getAs[String](0)
    buffer1(0) = concatStrings(acc1, acc2)
  }

  def evaluate(buffer: Row): Any = buffer.getAs[String](0)
}

And then use this way

val stringConcatener = new ConcatStringsUDAF("Category_ID", ",")
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs"))

As from Spark 1.6, have a look at Datasets and Aggregator.

Boris
  • 1,093
  • 2
  • 14
  • 22
0

After some research I've came up with sth like that:

    val data = sc.parallelize(
    List(
        ("1", "v1", "d1"),
        ("1", "v2", "d2"),
        ("2", "v21", "d21"),
        ("2", "v22", "d22")))
        .map{ case(id, value, desc)=>((id), (value, desc))}
        .reduceByKey((x,y)=>(x._1+";"+y._1, x._2+";"+x._2))
        .map{ case(id,(value, desc))=>(id, value, desc)}.toDF("id", "value","desc")
        .show()

that leaves me with:

    +---+-------+-------+
    | id|  value|   desc|
    +---+-------+-------+
    |  1|  v1;v2|  d1;d1|
    |  2|v21;v22|d21;d21|
    +---+-------+-------+
Silverrose
  • 160
  • 4
  • 18