0

I have a dataset with date,accountid and value. I want to transform the dataset to a new dataset where if accountid is not present in a particular date then add a accountid with value of 0 against that date.Is this possible

    val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
  ("2018-01-02", 120.6,"id1"),
  ("2018-01-03", 450.2,"id2")
  )).toDF("date", "val","accountid")
    +----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-02|120.6|      id1|
|2018-01-03|450.2|      id2|
+----------+-----+---------+

I want to transform this dataset into this format

+----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-01|  0.0|      id2|
|2018-01-02|120.6|      id1|
|2018-01-02|  0.0|      id2|
|2018-01-03|450.2|      id2|
|2018-01-03|0.0  |      id1|
+----------+-----+---------+
Masterbuilder
  • 499
  • 2
  • 12
  • 24

2 Answers2

1

You can simply use a udf function to get your requirement fulfilled.

But before that you will have to get the complete set of accountids and get it broadcasted to be used in udf function.

The returned array from udf function is to be exploded and finally select the columns.

import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)

val broadCastedIdList = sc.broadcast(idList)

def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))

df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
    .withColumn("struct", explode(col("struct")))
    .select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
  .show(false)

And of course you would need a case class

case class accounts(date:String, value:Double, accountid:String)

which should give you

+----------+-----+---------+
|date      |val  |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1      |
|2018-01-01|0.0  |id2      |
|2018-01-02|120.6|id1      |
|2018-01-02|0.0  |id2      |
|2018-01-03|450.2|id2      |
|2018-01-03|0.0  |id1      |
+----------+-----+---------+

Note: value keyword is used in case class because reserved identifier names cannot be used as variable names

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
0

You can create reference

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val Row(minTs: Long, maxTs: Long) = df
  .select(to_date($"date").cast("timestamp").cast("bigint") as "date")
  .select(min($"date"), max($"date")).first

val by =  60 * 60 * 24

val ref = spark
  .range(minTs, maxTs + by, by)
  .select($"id".cast("timestamp").cast("date").cast("string").as("date"))
  .crossJoin(df.select("accountid").distinct)

and outer join with input data:

ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+      
// |      date|accountid|  val|
// +----------+---------+-----+
// |2018-01-03|      id1|  0.0|
// |2018-01-01|      id1|100.5|
// |2018-01-02|      id2|  0.0|
// |2018-01-02|      id1|120.6|
// |2018-01-03|      id2|450.2|
// |2018-01-01|      id2|  0.0|
// +----------+---------+-----+

Concept adopted from this sparklyr answer by user6910411.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Excellent ! it works. One question does the cross join function performance intensive, lets say if the original dataset contain 1M record? – Masterbuilder May 15 '18 at 16:11
  • Complete process is expensive due to shuffles and conversion from sparse to dense, but you cannot really avoid the cost with desired result. – Alper t. Turker May 15 '18 at 16:15