4

I have a data-frame with N fields as mentioned below. The number of columns and length of the value will vary.

Input Table:

+--------------+-----------+-----------------------+
|Date          |Amount     |Status                 |
+--------------+-----------+-----------------------+
|2019,2018,2017|100,200,300|IN,PRE,POST            |
|2018          |73         |IN                     |
|2018,2017     |56,89      |IN,PRE                 |
+--------------+-----------+-----------------------+

I have to convert it into the below format with one sequence column.

Expected Output Table:

+-------------+------+---------+
|Date  |Amount|Status| Sequence|
+------+------+------+---------+
|2019  |100   |IN    |   1     |
|2018  |200   |PRE   |   2     |
|2017  |300   |POST  |   3     |
|2018  |73    |IN    |   1     |
|2018  |56    |IN    |   1     |
|2017  |89    |PRE   |   2     |
+-------------+------+---------+

I have Tried using explode but explode only take one array at a time.

var df = dataRefined.withColumn("TOT_OVRDUE_TYPE", explode(split($"TOT_OVRDUE_TYPE", "\\"))).toDF

var df1 = df.withColumn("TOT_OD_TYPE_AMT", explode(split($"TOT_OD_TYPE_AMT", "\\"))).show 

Does someone know how I can do it? Thank you for your help.

Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
user2427413
  • 65
  • 1
  • 5

6 Answers6

1

Yes, I personally also find explode a bit annoying and in your case I would probably go with a flatMap instead:

import spark.implicits._
import org.apache.spark.sql.Row
val df = spark.sparkContext.parallelize(Seq((Seq(2019,2018,2017), Seq(100,200,300), Seq("IN","PRE","POST")),(Seq(2018), Seq(73), Seq("IN")),(Seq(2018,2017), Seq(56,89), Seq("IN","PRE")))).toDF()

val transformedDF = df
  .flatMap{case Row(dates: Seq[Int], amounts: Seq[Int], statuses: Seq[String]) =>
     dates.indices.map(index => (dates(index), amounts(index), statuses(index), index+1))}
  .toDF("Date", "Amount", "Status", "Sequence")

Output:

df.show
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • Thans Glennie, But this is specific for the 3 columns but i need a generic solution Beacuse the number of columns may vary in my dataframe. – user2427413 Jul 02 '19 at 06:36
  • No, it is NOT specific for 3 columns. I used your example. – Glennie Helles Sindholt Jul 02 '19 at 09:05
  • This also gives me this error Warning: non-variable type argument Int in type pattern Seq[Int] (the underlying of Seq[Int]) is unchecked since it is eliminated by erasure .flatMap{case Row(dates: Seq[Int], amounts: Seq[Int], statuses: – user2427413 Jul 02 '19 at 10:58
  • Let say if i have dataframe with different columns and number of columns is like 5.then how code will look like – user2427413 Jul 02 '19 at 11:00
  • What exactly do you mean with `different columns`? Do you mean that the input table has other columns than the ones you posted? – Glennie Helles Sindholt Jul 02 '19 at 11:19
  • And btw, it gives you a warning - not an error. Scala does not enforce types in sequences, so you can leave it out. I simply find it useful and easier to read when I write the types explicitly. – Glennie Helles Sindholt Jul 02 '19 at 11:22
1

Here is another approach using posexplode for each column and joining all produced dataframes into one:

import org.apache.spark.sql.functions.{posexplode, monotonically_increasing_id, col}

val df = Seq(
  (Seq("2019", "2018", "2017"), Seq("100", "200", "300"), Seq("IN", "PRE", "POST")),
  (Seq("2018"), Seq("73"), Seq("IN")),
  (Seq("2018", "2017"), Seq("56", "89"), Seq("IN", "PRE")))
.toDF("Date","Amount", "Status")
.withColumn("idx", monotonically_increasing_id)

df.columns.filter(_ != "idx").map{
  c => df.select($"idx", posexplode(col(c))).withColumnRenamed("col", c)
}
.reduce((ds1, ds2) => ds1.join(ds2, Seq("idx", "pos")))
.select($"Date", $"Amount", $"Status", $"pos".plus(1).as("Sequence"))
.show

Output:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • Thank you so much.But this gives me sequence like 0,1,2 but i need sequence 1,2.3 and one more issue i am facing is that if let say there is row like this Seq("2018"), Seq("73"), Seq("")) in a dataframe this code skip the whole row. i have few rows where such scenario also exists. – user2427413 Jul 02 '19 at 06:27
  • Hi there, i will do the required changes as soon as I find a free slot although I believe that there are some solutions here, such as Shu's, that will perform much better since they are not using join. Please give it a try and compare the results by your own. – abiratsis Jul 02 '19 at 07:37
  • I have also tried Shu solution there are few issues 1.It is not generic it is for 3 Columns as you know number the columns may vary in my dataframe and name of columns as well. 2.Second i am getting below mentioned exception org.apache.spark.sql.catalyst.parser.ParseException: – user2427413 Jul 02 '19 at 07:52
  • Hi again I updated the code for the sequence starting from 1 as for the second part of the question (row: Seq("2018"), Seq("73"), Seq("")) this will return empty value for the Status field and not the whole row. Here are my results for the mentioned row: `+----+------+------+--------+ |Date|Amount|Status|Sequence| +----+------+------+--------+ |2019| 100| IN| 1| |2018| 200| PRE| 2| |2017| 300| POST| 3| |2018| 73| | 1| |2018| 56| IN| 1| |2017| 89| PRE| 2| +----+------+------+--------+` – abiratsis Jul 02 '19 at 08:30
  • Thanks Alexandros, Let me try this solution on my data frame if i get any error i will get back to you. Thank you so much for your time. – user2427413 Jul 02 '19 at 08:37
  • Hi Again,one issue that i have found is that if i have row in my dataframe like this (Seq("2018","2019"), Seq("73"), Seq("PRE","IN")) this code will skip the 2nd combination and output is like this +----+------+------+--------+ |Date|Amount|Status|Sequence| +----+------+------+--------+ |2018|73 |PRE |1 | +----+------+------+--------+ but output should be like this +----+------+------+--------+ |Date|Amount|Status|Sequence| +----+------+------+--------+ |2018|73 |PRE |1 | |2019| |IN |2 | +----+------+------+--------+ i hope you understand – user2427413 Jul 02 '19 at 10:34
  • so you have some cases in which the columns dont have the same length. This is bad for the arrays and we will need make them the same length somehow by filling the missing values with default ones – abiratsis Jul 03 '19 at 06:01
1

You can achieve this by using Dataframe inbuilt functions arrays_zip,split,posexplode

Explanation:

scala>val df=Seq((("2019,2018,2017"),("100,200,300"),("IN,PRE,POST")),(("2018"),("73"),("IN")),(("2018,2017"),("56,89"),("IN,PRE"))).toDF("date","amount","status")

scala>:paste
df.selectExpr("""posexplode(
                            arrays_zip(
                                        split(date,","), //split date string with ',' to create array
                                        split(amount,","),
                                        split(status,","))) //zip arrays
                            as (p,colum) //pos explode on zip arrays will give position and column value
            """)
    .selectExpr("colum.`0` as Date", //get 0 column as date
                "colum.`1` as Amount", 
                "colum.`2` as Status", 
                "p+1 as Sequence") //add 1 to the position value
    .show()

Result:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
notNull
  • 30,258
  • 4
  • 35
  • 50
  • Thanks Shu, But this is specific for the 3 columns but i need a generic solution actually the number of columns may vary in my dataframe. – user2427413 Jul 02 '19 at 06:35
0

Assuming the number of data elements in each column is the same for each row:

First, I recreated your DataFrame

import org.apache.spark.sql._
import scala.collection.mutable.ListBuffer

val df = Seq(("2019,2018,2017", "100,200,300", "IN,PRE,POST"), ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")).toDF("Date", "Amount", "Status")

Next, I split the rows and added a sequence value, then converted back to a DF:

val exploded = df.rdd.flatMap(row => {
  val buffer = new ListBuffer[(String, String, String, Int)]
  val dateSplit = row(0).toString.split("\\,", -1)
  val amountSplit = row(1).toString.split("\\,", -1)
  val statusSplit = row(2).toString.split("\\,", -1)
  val seqSize = dateSplit.size
  for(i <- 0 to seqSize-1)
    buffer += Tuple4(dateSplit(i), amountSplit(i), statusSplit(i), i+1)
  buffer.toList
}).toDF((df.columns:+"Sequence"): _*)

I'm sure there are other ways to do it without first converting the DF to an RDD, but this will still result with a DF with the correct answer.

Let me know if you have any questions.

Community
  • 1
  • 1
Jonathan Myers
  • 930
  • 6
  • 17
  • Thanks Jonathan, But this is specific for the 3 columns but i need a generic solution actually the number of columns may vary in my dataframe. – user2427413 Jul 02 '19 at 06:35
  • One more thing the amount of data i have is very large, for loop will slow down the transformation and loading of data – user2427413 Jul 02 '19 at 08:53
0

I took advantage of the transpose to zip all Sequences by position and then did a posexplode. Selects on dataFrames are dynamic to satisfy the condition: The number of columns and length of the value will vary in the question.

import org.apache.spark.sql.functions._


val df = Seq(
  ("2019,2018,2017", "100,200,300", "IN,PRE,POST"),
  ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")
).toDF("Date", "Amount", "Status")
df: org.apache.spark.sql.DataFrame = [Date: string, Amount: string ... 1 more field]

scala> df.show(false)
+--------------+-----------+-----------+
|Date          |Amount     |Status     |
+--------------+-----------+-----------+
|2019,2018,2017|100,200,300|IN,PRE,POST|
|2018          |73         |IN         |
|2018,2017     |56,89      |IN,PRE     |
+--------------+-----------+-----------+


scala> def transposeSeqOfSeq[S](x:Seq[Seq[S]]): Seq[Seq[S]] = { x.transpose }
transposeSeqOfSeq: [S](x: Seq[Seq[S]])Seq[Seq[S]]

scala> val myUdf = udf { transposeSeqOfSeq[String] _}
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true))))

scala> val df2 = df.select(df.columns.map(c => split(col(c), ",") as c): _*)
df2: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 1 more field]

scala> df2.show(false)
+------------------+---------------+---------------+
|Date              |Amount         |Status         |
+------------------+---------------+---------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|
|[2018]            |[73]           |[IN]           |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |
+------------------+---------------+---------------+


scala> val df3 = df2.withColumn("allcols", array(df.columns.map(c => col(c)): _*))
df3: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 2 more fields]

scala> df3.show(false)
+------------------+---------------+---------------+------------------------------------------------------+
|Date              |Amount         |Status         |allcols                                               |
+------------------+---------------+---------------+------------------------------------------------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|[[2019, 2018, 2017], [100, 200, 300], [IN, PRE, POST]]|
|[2018]            |[73]           |[IN]           |[[2018], [73], [IN]]                                  |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |[[2018, 2017], [56, 89], [IN, PRE]]                   |
+------------------+---------------+---------------+------------------------------------------------------+


scala> val df4 = df3.withColumn("ab", myUdf($"allcols")).select($"ab", posexplode($"ab"))
df4: org.apache.spark.sql.DataFrame = [ab: array<array<string>>, pos: int ... 1 more field]

scala> df4.show(false)
+------------------------------------------------------+---+-----------------+
|ab                                                    |pos|col              |
+------------------------------------------------------+---+-----------------+
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|0  |[2019, 100, IN]  |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|1  |[2018, 200, PRE] |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|2  |[2017, 300, POST]|
|[[2018, 73, IN]]                                      |0  |[2018, 73, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |0  |[2018, 56, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |1  |[2017, 89, PRE]  |
+------------------------------------------------------+---+-----------------+

scala> val selCols = (0 until df.columns.length).map(i => $"col".getItem(i).as(df.columns(i))) :+ ($"pos"+1).as("Sequence")
selCols: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(col[0] AS `Date`, col[1] AS `Amount`, col[2] AS `Status`, (pos + 1) AS `Sequence`)

scala> df4.select(selCols:_*).show(false)
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|100   |IN    |1       |
|2018|200   |PRE   |2       |
|2017|300   |POST  |3       |
|2018|73    |IN    |1       |
|2018|56    |IN    |1       |
|2017|89    |PRE   |2       |
+----+------+------+--------+
C.S.Reddy Gadipally
  • 1,718
  • 11
  • 22
  • Thank you so much for your solution, This works fine for above rows but for my dataframe at this line val df4 = df3.withColumn("ab", myUdf($"allcols")).select($"ab", posexplode($"ab")) it gives me below error "Task Not serializable" – user2427413 Jul 02 '19 at 06:30
  • Check this to solve "task not serializable" error https://stackoverflow.com/a/22596875/6024518 – C.S.Reddy Gadipally Jul 02 '19 at 15:13
0

This is why I love spark-core APIs. Just with the help of map and flatMap you can handle many problems. Just pass your df and the instance of SQLContext to below method and it will give the desired result -

def reShapeDf(df:DataFrame, sqlContext: SQLContext): DataFrame ={

    val rdd = df.rdd.map(m => (m.getAs[String](0),m.getAs[String](1),m.getAs[String](2)))

    val rdd1 = rdd.flatMap(a => a._1.split(",").zip(a._2.split(",")).zip(a._3.split(",")))
    val rdd2 = rdd1.map{
      case ((a,b),c) => (a,b,c)
    }

    sqlContext.createDataFrame(rdd2.map(m => Row.fromTuple(m)),df.schema)
}
a9207
  • 344
  • 1
  • 4