2

I have a dataframe with a column which is nested StructType. The StructType is deeply nested and may comprise other Structs. Now I want to update this column at the lowest level. I tried withField but it doesn't work if any of the top level struct is null. I will appreciate any help with this.

The example schema is:

val schema = new StructType()
      .add("key", StringType)
      .add(
        "cells",
        ArrayType(
          new StructType()
            .add("family", StringType)
            .add("qualifier", StringType)
            .add("timestamp", LongType)
            .add("nestStruct", new StructType()
                .add("id1", LongType)
                .add("id2", StringType)
.               .add("id3", new StructType()
                   .add("id31", LongType)
                   .add("id32", StringType))
        )
      )

val data = Seq(
      Row(
        "1235321863",
        Array(
          Row("a", "b", 1L,  null)
        )
      )
    )

  
   val  df_test = spark
      .createDataFrame(spark.sparkContext.parallelize(data), schema) 

val result = df_test.withColumn(
  "cell1",
  transform($"cells", cell => {
      cell.withField("nestStruct.id3.id31", lit(40)) /*This line doesn't do anything is nestStruct is null. */
  }))
result.show(false)
result.printSchema 
result.explain() /*The physical plan shows that if a field is null it will just return null*/
Vikas
  • 1,422
  • 2
  • 12
  • 16
  • Can you add expected and actual output? If the parent struct is non-null, does it works as expected? Also, what do you expect if the parent is null? – Ronak Jain Feb 18 '23 at 04:23

1 Answers1

1

You can use the solution suggested for this question: How do I add a column to a nested struct in a PySpark dataframe?

Or you can try the following:
You can write your current dataframe to a json file, read the json file to a string, and try writing a regular expression to add the field you want to the json string, then write the json string to a new file, and read the new file to a dataframe.

For example, I'm using the example provided above:

import json, re

with open('./pyspark_sandbox_sample.json') as input_file:
    string_data = str(json.load(input_file))
input_file.close()

string_data = re.sub(r"'id32': '(.*?)'", r"'id32': '\1', 'id33': 40", string_data)

with open('./pyspark_sandbox_sample.json', 'w') as output_file:
    json.dump(eval(string_data), output_file)
output_file.close()
ARCrow
  • 1,360
  • 1
  • 10
  • 26