0

Spark Dataframe formate conversion from input dataframe 1

|------|--------|-----------|---------------|------------- |
|city   product | Jan(sale)  |    Feb(sale) |    Mar(sale)|
|---------------|------------|--------------|-------------|
|c1   |   p1    |   123      |       22     |    34       |
|---------------|------------|--------------|-------------|
|c2   |   p2    |      234   |      432     |      43     |
|---------------|------------|--------------|-------------|

to the output dataframe2 as transpose of the entire row and column as shown below.

|city  |    product   |  metric_type    metric_value|
--------------------------------------------------- |
|  c1   |    p1     |      Jan   |     123          |
----------------------------------------------------
| c1   |     p1     |     Feb    |     22           |
-----------------------------------------------------
| c1   |     p1     |      Mar   |     34           |
|  --------------------------------------------------
sathya
  • 1,982
  • 1
  • 20
  • 37
prakash
  • 419
  • 2
  • 4
  • 13
  • 1
    What is the question? – Assaf Mendelson Jul 19 '17 at 12:51
  • Hi Assaf, convert first dataframe to second dataframe without using spark sql – prakash Jul 20 '17 at 04:41
  • what do you mean without using spark sql? – Assaf Mendelson Jul 20 '17 at 04:55
  • i mean we need solution without using spark sql ,only use dataset API method – prakash Jul 20 '17 at 05:26
  • use methods on this url https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset – prakash Jul 20 '17 at 05:28
  • I offered a solution with dataset only, however, I believe that using dataset only is a mistake. First a dataframe and dataset are interchangeable so you can use element son one and then the next easily, second dataframe can achieve much better performance as it allows a lot of optimization, Third dataframe can handle more dynamic data (if for example you had 100 months then doing it with dataset only operators would be very bad while in dataframe you can easily turn the columns into an array). Lastly, I am not sure what a "dataset" only actually means as all dataframe API is also dataset. – Assaf Mendelson Jul 20 '17 at 06:23
  • Possible duplicate of [Transpose column to row with Spark](https://stackoverflow.com/questions/37864222/transpose-column-to-row-with-spark) – Davis Broda Jul 31 '17 at 18:06

2 Answers2

1

A Dataset only solution would look like this:

case class orig(city: String, product: String, Jan: Int, Feb: Int, Mar: Int)
case class newOne(city: String, product: String, metric_type: String, metric_value: Int)
val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar")
val newDf = df.as[orig].flatMap(v => Seq(newOne(v.city, v.product, "Jan", v.Jan), newOne(v.city, v.product, "Feb", v.Feb), newOne(v.city, v.product, "Mar", v.Mar)))
newDf.show()
>>+----+-------+-----------+-----------+
>>|city|product|metric_type|metric_value|
>>+----+-------+-----------+-----------+
>>|  c1|     p1|        Jan|        123|
>>|  c1|     p1|        Feb|         22|
>>|  c1|     p1|        Mar|         34|
>>|  c2|     p2|        Jan|        234|
>>|  c2|     p2|        Feb|        432|
>>|  c2|     p2|        Mar|         43|
>>+----+-------+-----------+-----------+

Using dataframe API

While the OP asked specifically for dataset only without spark sql, for others who look at this question, I believe a dataframe solution should be used.

First it is important to understand that dataset API is part of the spark SQL API. Datasets and dataframes are interchangeable and actually dataframe is simply a DataSet[Row]. While dataset has both "typed" and "untyped" API, ignoring some of the API seems wrong to me.

Second, pure "typed" option has limitations. For example, if we had 100 months instead of 3 then doing it the way above would be impractical.

Lastly, Spark provides a lot of optimization on dataframes which are unavaiable when using typed API (as the typed API is opaque to Spark) and therefore in many cases would get worse performance.

I would suggest using the following dataframe solution:

 val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar")
 val months  = Seq("Jan", "Feb", "Mar")
 val arrayedDF = df.withColumn("combined", array(months.head, months.tail: _*))_*)).select("city", "product", "combined")
 val explodedDF = arrayedDF.selectExpr("city", "product", "posexplode(combined) as (pos, metricValue)")
 val u =  udf((p: Int) => months(p))
 val targetDF = explodedDF.withColumn("metric_type", u($"pos")).drop("pos")
 targetDF.show()
>>+----+-------+-----------+-----------+
>>|city|product|metricValue|metric_type|
>>+----+-------+-----------+-----------+
>>|  c1|     p1|        123|        Jan|
>>|  c1|     p1|         22|        Feb|
>>|  c1|     p1|         34|        Mar|
>>|  c2|     p2|        234|        Jan|
>>|  c2|     p2|        432|        Feb|
>>|  c2|     p2|         43|        Mar|
>>+----+-------+-----------+-----------+

While this is a little longer, it handles the more generic case.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Hi Assaf, df.as[orig] not working ,compile time error ◾Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. ◾not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[orig])org.apache.spark.sql.Dataset[orig]. Unspecified value parameter evidence$2. – prakash Jul 20 '17 at 06:53
  • Then the code I provided should work out of the box (I ran it on 2.1.1 but that shouldn't be an issue). Could it be that your original dataframe/dataset has an incompatible schema? If you created the dataset from something and the case class I provided is not compatible you might get an error. – Assaf Mendelson Jul 20 '17 at 09:26
  • Hi Assaf,second code provided by you is working,but i want to generalise it for all month of year val months = Seq("Jan", "Feb", "Mar","Apr","May","June","Jul","Aug","Sep","Oct","Nov","Dec") ,may be in my csv not all month present? ,please give me solution – prakash Jul 20 '17 at 10:47
  • If you have only columns for some of the month in the dataframe you can do one of two things: either remove missing columns from your months sequence (e.g. months.filter(m => df.columns.contains(m))) or (especially if you are combining multiple dataframes) you can add missing months like this: val newDF = df.withColumn("missing month name", lit(XXX)) where XXX is some value to represent missing data (e.g. null) and then filter it out. – Assaf Mendelson Jul 20 '17 at 11:09
0

You need to transform the data frame from wide to long format (or gather columns or unpivot the data frame), one option is use flatMap:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar")  

df.show
+----+-------+---+---+---+
|city|product|Jan|Feb|Mar|
+----+-------+---+---+---+
|  c1|     p1|123| 22| 34|
|  c2|     p2|234|432| 43|
+----+-------+---+---+---+

// schema of the result data frame
val schema = StructType(List(StructField("city", StringType, true), 
                             StructField("product", StringType,true), 
                             StructField("metric_type", StringType, true), 
                             StructField("metric_value", IntegerType, true)))

val months = List("Jan", "Feb", "Mar")    
val index = List("city", "product")

// use flatMap to convert each row into three rows
val rdd = df.rdd.flatMap(
    row => {
        val index_values = index.map(i => row.getAs[String](i))
        months.map(m => Row.fromSeq(index_values ++ List(m, row.getAs[Int](m))))
    }
)

spark.createDataFrame(rdd, schema).show
+----+-------+-----------+------------+
|city|product|metric_type|metric_value|
+----+-------+-----------+------------+
|  c1|     p1|        Jan|         123|
|  c1|     p1|        Feb|          22|
|  c1|     p1|        Mar|          34|
|  c2|     p2|        Jan|         234|
|  c2|     p2|        Feb|         432|
|  c2|     p2|        Mar|          43|
+----+-------+-----------+------------+
Psidom
  • 209,562
  • 33
  • 339
  • 356