1

I have a question I was unable to solve when working with Scala Spark (or PySpark). How can we merge two fields that are arrays of structs of different fields.

For example, if I have schema like so:

df.printSchema()
root
 |-- arrayOne: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayTwo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)
 |    |    |-- Q: string (nullable = true)

Can I create a df of the following schema using UDF:

df.printSchema()
root
 |-- arrayOne: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayTwo: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |-- ArrayThree: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- Q: string (nullable = true)
 |    |    |-- x: string (nullable = true)
 |    |    |-- y: string (nullable = true)
 |    |    |-- z: string (nullable = true)

When a,b,c are not null, x,y,z are null and vice-versa, however when x,y,z are nulls there is Q that will be non-null and have the same value for both arrays.

The UDF is an important aspect here, as exploding (explode_outer) both fields will be:

  1. Too expensive
  2. Resulting in repetition of the second array elements that would corrupt the fidelity of the data - because of the element Q.

Writing UDF in Pig Latin or even plain Map Reduce would be very easy, but for some reason it is very complicated in the Spark environment, for me at least.

What would be a way to write a UDF to concatenate the two arrays and create the new struct with superset of elements of the two different structs?

Gligorijevic
  • 75
  • 1
  • 10

2 Answers2

0

Here's a sample test I did. I created 2 fields of Array(Struct()) - arr_struct1 and arr_struct2. Using them, I created the new field arr_struct12 that has all elements of the previous 2 array-struct fields. I've retained all columns in the printSchema() for a better understanding.

data_sdf. \
    withColumn('arr_struct1', func.array(func.struct(func.col('a').alias('a'), func.col('b').alias('b'), func.col('c').alias('c')))). \
    withColumn('arr_struct2', func.array(func.struct(func.col('e').alias('e'), func.col('f').alias('f')))). \
    withColumn('struct1', func.col('arr_struct1')[0]). \
    withColumn('struct2', func.col('arr_struct2')[0]). \
    withColumn('arr_struct12', func.array(func.struct('struct1.*', 'struct2.*'))). \
    printSchema()

# ignore columns a to g in the schema below

# root
#  |-- a: long (nullable = true)
#  |-- b: long (nullable = true)
#  |-- c: long (nullable = true)
#  |-- d: long (nullable = true)
#  |-- e: long (nullable = true)
#  |-- f: long (nullable = true)
#  |-- g: long (nullable = true)
#  |-- arr_struct1: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- a: long (nullable = true)
#  |    |    |-- b: long (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |-- arr_struct2: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- e: long (nullable = true)
#  |    |    |-- f: long (nullable = true)
#  |-- vals1: struct (nullable = true)
#  |    |-- a: long (nullable = true)
#  |    |-- b: long (nullable = true)
#  |    |-- c: long (nullable = true)
#  |-- vals2: struct (nullable = true)
#  |    |-- e: long (nullable = true)
#  |    |-- f: long (nullable = true)
#  |-- arr_struct12: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- a: long (nullable = true)
#  |    |    |-- b: long (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- e: long (nullable = true)
#  |    |    |-- f: long (nullable = true)

In case you'd like to specify which elements to keep, you can specify it using col('col_name.element_alias') instead of the *.

data_sdf. \
    withColumn('arr_struct1', func.array(func.struct(func.col('a').alias('a'), func.col('b').alias('b'), func.col('c').alias('c')))). \
    withColumn('arr_struct2', func.array(func.struct(func.col('e').alias('e'), func.col('f').alias('f')))). \
    withColumn('struct1', func.col('arr_struct1')[0]). \
    withColumn('struct2', func.col('arr_struct2')[0]). \
    withColumn('arr_struct12', 
               func.array(func.struct(func.col('struct1.a').alias('a'), 
                                      func.col('struct1.b').alias('b'), 
                                      func.col('struct2.f').alias('f')
                                      )
                          )
               ). \
    printSchema()

# ignore columns a to g in the schema below

# root
#  |-- a: long (nullable = true)
#  |-- b: long (nullable = true)
#  |-- c: long (nullable = true)
#  |-- d: long (nullable = true)
#  |-- e: long (nullable = true)
#  |-- f: long (nullable = true)
#  |-- g: long (nullable = true)
#  |-- arr_struct1: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- a: long (nullable = true)
#  |    |    |-- b: long (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |-- arr_struct2: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- e: long (nullable = true)
#  |    |    |-- f: long (nullable = true)
#  |-- struct1: struct (nullable = true)
#  |    |-- a: long (nullable = true)
#  |    |-- b: long (nullable = true)
#  |    |-- c: long (nullable = true)
#  |-- struct2: struct (nullable = true)
#  |    |-- e: long (nullable = true)
#  |    |-- f: long (nullable = true)
#  |-- arr_struct12: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- a: long (nullable = true)
#  |    |    |-- b: long (nullable = true)
#  |    |    |-- f: long (nullable = true)
samkart
  • 6,007
  • 2
  • 14
  • 29
  • Thank you for your reply @samkart! Checking your solution it seems that you are only taking first struct element from array combining two structs and wrapping then in another array which will contain only a single element? The schema will look right, but the data will be lost. - Am I right about this? – Gligorijevic Jul 18 '22 at 16:36
  • @Gligorijevic `ArrayOne` and `ArrayTwo` have only 1 element (that's a struct). so, the [0] pulls the struct. no data is lost in this case as it pulls the whole struct. I've retained all columns in the `printSchema()`, so take a look there – samkart Jul 18 '22 at 16:39
  • I agree with your solution for the case where arrays contain only one struct, but in my case arrays contain many structs inside, while the two arrays of structs have one field that is shared. – Gligorijevic Jul 19 '22 at 21:48
  • @Gligorijevic please add all nuances in your example and question for us to understand better in first attempt. – samkart Jul 20 '22 at 03:36
0

I will share below the solution that worked for me. Solution is a simple UDF that takes two arrays of structs as input, and creates a sequence of new struct that supersets the fields of the two structs as required:

case class ItemOne(a: String,
                   b: String,
                   c: String,
                   Q: String)

case class ItemTwo(x: String,
                   y: String,
                   z: String,
                   Q: String)

case class ItemThree(a: String,
                   b: String,
                   c: String,
                   x: String,
                   y: String,
                   z: String,
                   Q: String)


val combineAuctionData = udf((arrayOne: Seq[Row], arrayTwo: Seq[Row]) => {
  val result = new ListBuffer[ItemThree]()

  // For loop over list of ItemOne and get all ItemThree
  for (el <- arrayOne) {
    result += ItemThree(el.getString(0),
                        el.getString(1),
                        el.getString(2),
                        None,
                        None,
                        None,
                        el.getString(3))
  }

  // For loop over list of ItemTwo and get all ItemThree
  for (el <- arrayTwo) {
    result += ItemThree(None,
                        None,
                        None,
                        el.getString(0),
                        el.getString(1),
                        el.getString(2),
                        el.getString(3))
  }

  // Return List inheriting Seq of ItemThree's
  result.toSeq
}: Seq[ItemThree])
Gligorijevic
  • 75
  • 1
  • 10