2

I have a Spark Dataframe with the following schema.

[{ "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-plan",
    "status": {
        "state": "SUCCESS"
    }
  },
  { "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-proc",
    "status": {
        "state": "FAILED"
  }
}]

I want to transform it into another DF having the following schema [{"rec": "test-plan", "status": "SUCCESS"}, {"rec": "test-pROC", "status": "FAILED"}]

I have written the following code, but it doesn't compile and complains of wrong encoding.

val fdf = DF.map(f => {
        val listCommands = f.get(0).asInstanceOf[WrappedArray[Map[String, Any]]]
        val m = listCommands.map(h => {
            var rec = "none"
            var status = "none"

            if(h.exists("status" == "state" -> _)) {
                status = (h.get("status") match {
                    case Some(x) => x.asInstanceOf[HashMap[String, String]].getOrElse("state", "none")
                    case _ => "none"
                })

                if(h.contains("rec")) {
                    rec = (h.get("rec") match {
                        case Some(x: String) => x
                        case _ => "none"
                    })
                }
            }

          Map("status"->status, "rec"->rec)
        })

      val rm = m.flatten
      rm
    })

Please suggest the right way.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
devj
  • 1,123
  • 2
  • 11
  • 24

2 Answers2

1

That's going to be tricky since the top-level elements of the JSONs are not the same, i.e. you have map1 and map2, and hence the schema is inconsistent. I'd speak to the "data producer" and requests a change so the name of the command is described by a separate element.


Given the schema of the DataFrame is as follows:

scala> commands.printSchema
root
 |-- commands: array (nullable = true)
 |    |-- element: string (containsNull = true)

and the number of elements (rows) in it:

scala> commands.count
res1: Long = 1

You have to explode the commands array of elements first followed by accessing the fields of interest.

// 1. Explode the array
val commandsExploded = commands.select(explode($"commands") as "command")
scala> commandsExploded.count
res2: Long = 2

Let's create the schema of the JSON-encoded records. One could be as follows.

// Note that it accepts map1 and map2 fields
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("map1",
    StructType(
      StructField("completed-stages", LongType, true) ::
      StructField("total-stages", LongType, true) :: Nil), true) ::
  StructField("map2",
    StructType(
      StructField("completed-stages", LongType, true) ::
      StructField("total-stages", LongType, true) :: Nil), true) ::
  StructField("rec", StringType,true) ::
  StructField("status", StructType(
    StructField("state", StringType, true) :: Nil), true
  ) :: Nil)

With that, you should use from_json standard function that takes a column with JSON-encoded strings and a schema.

val commands = commandsExploded.select(from_json($"command", schema) as "command")
scala> commands.show(truncate = false)
+-------------------------------+
|command                        |
+-------------------------------+
|[[1, 1],, test-plan, [SUCCESS]]|
|[, [1, 1], test-proc, [FAILED]]|
+-------------------------------+

Let's have a look at the schema of the commands dataset.

scala> commands.printSchema
root
 |-- command: struct (nullable = true)
 |    |-- map1: struct (nullable = true)
 |    |    |-- completed-stages: long (nullable = true)
 |    |    |-- total-stages: long (nullable = true)
 |    |-- map2: struct (nullable = true)
 |    |    |-- completed-stages: long (nullable = true)
 |    |    |-- total-stages: long (nullable = true)
 |    |-- rec: string (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- state: string (nullable = true)

The complex fields like rec and status are structs that are .-accessible.

val recs = commands.select(
  $"command.rec" as "rec",
  $"command.status.state" as "status")

scala> recs.show
+---------+-------+
|      rec| status|
+---------+-------+
|test-plan|SUCCESS|
|test-proc| FAILED|
+---------+-------+

Converting it to a single-record JSON-encoded dataset requires Dataset.toJSON followed by collect_list standard function.

val result = recs.toJSON.agg(collect_list("value"))
scala> result.show(truncate = false)
+-------------------------------------------------------------------------------+
|collect_list(value)                                                            |
+-------------------------------------------------------------------------------+
|[{"rec":"test-plan","status":"SUCCESS"}, {"rec":"test-proc","status":"FAILED"}]|
+-------------------------------------------------------------------------------+
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
0

You didn't provide the schema for the df so the below might not work for you. I saved the json sample in a test.json file and read it with val df=spark.read.option("multiLine",true).json("test.json") in which case to get the json you want you just df.select($"rec",$"status.state").write.json("test1.json")

Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68