1

I've got a question on doing an aggregation on an array of nested JSON. I have the sample order dataframe or (shown as JSON) as below:

{
  "orderId": "oi1",
  "orderLines": [
    {
      "productId": "p1",
      "quantity": 1,
      "sequence": 1,
      "totalPrice": {
        "gross": 50,
        "net": 40,
        "tax": 10
      }
    },
    {
      "productId": "p2",
      "quantity": 3,
      "sequence": 2,
      "totalPrice": {
        "gross": 300,
        "net": 240,
        "tax": 60
      }
    }
  ]
}

How using Spark SQL to 'sum the quantities across all lines for a given order'?

e.g in this case 1 + 3 = 4

I'd like to write below but there is no equiv like built-in function supported it would appear (unless Ive missed it which could be likely!)

SELECT
  orderId,
  sum_array(orderLines.quantity) as totalQuantityItems
FROM
   orders

Maybe a custom UDF (Scala) is needed? What would this look like if so / any examples? Even going further into the nesting, sum the total items

SELECT
  orderId,
  sum_array(orderLines.totalPrice.net) as totalOrderNet
FROM
   orders
zero323
  • 322,348
  • 103
  • 959
  • 935
Kurt Maile
  • 1,171
  • 3
  • 13
  • 29

1 Answers1

2

Read the dataset using spark.read.json.

val orders = spark.
  read.
  option("wholeFile", true).
  json("orders.json").
  as[(String, Seq[(String, Long, Long, (Long, Long, Long))])]
scala> orders.show(truncate = false)
+-------+--------------------------------------------+
|orderId|orderLines                                  |
+-------+--------------------------------------------+
|oi1    |[[p1,1,1,[50,40,10]], [p2,3,2,[300,240,60]]]|
+-------+--------------------------------------------+

scala> orders.map { case (id, lines) => (id, lines.map(_._2).sum) }.toDF("id", "sum").show
+---+---+
| id|sum|
+---+---+
|oi1|  4|
+---+---+

You could make it even "prettier" using Scala's for-comprehension.

val quantities = for {
  o <- orders
  id = o._1
  quantity <- o._2
} yield (id, quantity._2)

val sumPerOrder = quantities.
  toDF("id", "quantity"). // <-- back to DataFrames to have names
  groupBy("id").
  agg(sum("quantity") as "sum")
scala> sumPerOrder.show
+---+---+
| id|sum|
+---+---+
|oi1|  4|
+---+---+
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420