1

I have the following schema:

root
 |-- data: struct (nullable = true)
 |    |-- user_agent_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_value_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_value_DEVICE_METADATA: string (nullable = true)

And I need to add a column with the tags as an array of structs like this:

root
 |-- data: struct (nullable = true)
 |    |-- user_agent_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_value_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_value_DEVICE_METADATA: string (nullable = true)
 |-- device_metadata_tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- epochTs: double (nullable = true)
 |    |    |-- timestamp: double (nullable = true)

But as you can see in the tags I don't always have all of the columns I need for example I don't have tags_0_value_DEVICE_METADATA and I don't know how long the list of the tags is, here its 3 but it can change. Is there a way to do something like that?

Ema Il
  • 405
  • 1
  • 5
  • 14

2 Answers2

1

You can use columns to get all column names and then use the column names to reorder the data:

1. Get the column names and extract the tags and the suffixes

df = ...
cols=df.select("data.*").columns
cols.remove("user_agent_DEVICE_METADATA")

import re
tag=set(map(lambda s: re.search('(tags_.*?)_',s).group(1),cols))
suffix=set(map(lambda s: re.search('(tags_.*?)_(.*)',s).group(2),cols))

The order of the elements of the struct and the order of the elements within the array are not fixed. If the orders matter tag and suffix can be sorted.

2. Calculate the column object for the structs

For each tag a struct is constructed. Each struct consists of all possible suffixes. If a tag/suffix combination does not exist in the original dataframe, it is filled with null.

from pyspark.sql import functions as F

def getStructElements(t):
      for s in suffix:
            col = t + "_" + s
            name = re.search('(.*?)_',s).group(1)
            if col in cols:
                  yield(F.col("data." + col).alias(name))
            else:
                  yield(F.lit(None).alias(name))

def createStructs():
      for t in tag:
            s = list(getStructElements(t))
            yield F.struct(s)

3. Select the data in the required format

df.select(F.col("data"), F.array(list(createStructs())).alias("device_metadata_tags"))
werner
  • 13,518
  • 6
  • 30
  • 45
  • 1
    You should add sorted to tags like `tag=sorted(set(map(lambda s: re.search('(tags_.*?)_',s).group(1),cols)))` what you wrote sometime returned the wrong order for me and then the array did not match the original columns – Ema Il Jun 21 '23 at 12:25
1

If your data columns are well structured (naming is always tags_x_y_DEVICE_METADATA) and your output is fixed to name, value, epochTs, timestamp.

You can try this approach.

First, collect tags_x_ into a struct and fill any missing columns.

output_cols = ['name', 'value', 'epochTs', 'timestamp']

# For tags_0_***
F.struct(*[F.col(f'data.tags_0_{c}_DEVICE_METADATA').alias(c) 
           if f'tags_0_{c}_DEVICE_METADATA' in df.select('data.*').columns
           else F.lit(None).alias(c)                              
           for c in output_cols])      

Loop this operation for tags_0/1/2. Then, collect all struct into an array.

Final code.

df = (df.select('data', *[F.struct(*[F.col(f'data.tags_{num}_{c}_DEVICE_METADATA').alias(c) 
                                     if f'tags_{num}_{c}_DEVICE_METADATA' in df.select('data.*').columns
                                     else F.lit(None).alias(c)                              
                                     for c in output_cols]
                                  ).alias(f'tags_{num}')
                          for num in range(3)])
      .select('data', F.array(*[F.col(f'tags_{num}') for num in range(3)]).alias('device_metadata_tags')))
Emma
  • 8,518
  • 1
  • 18
  • 35
  • I wish I could accept your answer as well! this worked for me without changing anything, I just chose the first answer because I like generators – Ema Il Jun 21 '23 at 12:26