5

I am quite new to pyspark and this problem is boggling me. Basically I am looking for a scalable way to loop typecasting through a structType or ArrayType.

Example of my data schema:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: integer (nullable = true)
 |    |-- rate_2: integer (nullable = true)
 |    |-- rate_3: integer (nullable = true)
 |    |-- card_fee: integer (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: integer (nullable = true)
 |    |    |-- rate_2: integer (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

As you can see here, card_rates is struct and online_rates is an array of struct. I am looking ways to loop through all the fields above and conditionally typecast them. Ideally if it is supposed to be numeric, it should be converted to double, if it is supposed to be string, It should be converted to string. I need to loop because those rate_* fields may grow with time.

But for now, I am content with being able to loop them and typecast all of them to string since I am very new with pyspark and still trying to get a feel of it.

My desired output schema:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: double (nullable = true)
 |    |-- rate_2: double (nullable = true)
 |    |-- rate_3: double (nullable = true)
 |    |-- card_fee: double (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: double (nullable = true)
 |    |    |-- rate_2: double (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)

I am running out ideas how to do this.

I got reference from here: PySpark convert struct field inside array to string

but this solution hardcodes the field and does not really loop over the fields.

Kindly help.

addicted
  • 2,901
  • 3
  • 28
  • 49

1 Answers1

7

Here is one solution with the help of StructType.simpleString and the _parse_datatype_string build-in function:

from pyspark.sql.types import *

df_schema = StructType([
  StructField("_id", StringType(), True),
  StructField("created", TimestampType(), True),
  StructField("card_rates", StructType([
                  StructField("rate_1", IntegerType(), True),
                  StructField("rate_2", IntegerType(), True),
                  StructField("rate_3", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True)])),
  StructField("online_rates", ArrayType(
                  StructType(
                    [
                      StructField("rate_1", IntegerType(),True),
                      StructField("rate_2", IntegerType(),True),
                      StructField("online_fee", DoubleType(),True)
                    ]),True),True),
  StructField("updated", TimestampType(), True)])

schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int, card_fee:int>,online_rates:array<struct<rate_1:int,rate_2:int,online_fee:double>>,updated:timestamp>

double_schema = schema_str.replace(':int', ':double')

# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema

  1. First convert your schema into a simple string with schema.simpleString
  2. Then replace all :int with :double
  3. Finally convert the modified string schema into StructType with _parse_datatype_string

UPDATE:

In order to avoid the issue with the backticks that @jxc pointed out a better solution would be a recursive scan through the elements as shown next:

def transform_schema(schema):

  if schema == None:
    return StructType()

  updated = []
  for f in schema.fields:
    if isinstance(f.dataType, IntegerType):
      # if IntegerType convert to DoubleType
      updated.append(StructField(f.name, DoubleType(), f.nullable))
    elif isinstance(f.dataType, ArrayType):
      # if ArrayType unpack the array type(elementType), do recursion then wrap results with ArrayType 
      updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType))))
    elif isinstance(f.dataType, StructType):
      # if StructType do recursion
      updated.append(StructField(f.name, transform_schema(f.dataType)))
    else:
      # else handle all the other cases i.e TimestampType, StringType etc
      updated.append(StructField(f.name, f.dataType, f.nullable))   

  return StructType(updated)

# call the function with your schema
transform_schema(df_schema)

Explanation: the function goes through each item on the schema (StructType) and tries to convert the int fields (StructField) into double. Finally delivers the converted schema (StructType) to the above layer (parent StructType).

Output:

StructType(List(
  StructField(_id,StringType,true),
  StructField(created,TimestampType,true),
  StructField(card_rates,
              StructType(List(StructField(rate_1,DoubleType,true),
                              StructField(rate_2,DoubleType,true),
                              StructField(rate_3,DoubleType,true),
                              StructField(card_fee,DoubleType,true),
                              StructField(card_fee,DoubleType,true))),true),
  StructField(online_rates,ArrayType(
    StructType(List(
      StructField(rate_1,DoubleType,true),
      StructField(rate_2,DoubleType,true),
      StructField(online_fee,DoubleType,true))),true),true),
  StructField(updated,TimestampType,true)))

UPDATE: (2020-02-02)

And here is one example on how to use the new schema together with the existing dataframe:

updated_schema = transform_schema(df.schema)

# cast each column to the new type
select_expr = [df[f.name].cast(f.dataType) for f in updated_schema.fields]

df.select(*select_expr).show()
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • If in the next month I have rate_3 and rate_4, can this adapt to that change? Problem is Im not the one who creates `rate_3` and `rate_4`. Some other people did that and I never know how many rates a row of data can have. That's why it has to be less hard-code. – addicted Nov 28 '19 at 12:14
  • if the rate_3 and rate_4 are int as well the above python script will convert them yes. You just need to execute it every time the schema is updated – abiratsis Nov 28 '19 at 12:18
  • so my data is like this. Some data has `rate_1` only, some data has `rate_1` and `rate_2`, some data has `rate_1`, `rate_2`, `rate_3`. Will that return error if it is trying to convert `rate_*` which is non-existent? – addicted Nov 28 '19 at 12:20
  • 1
    no because the above script does not depend on the column names at all. Initially you have a schema, this schema should be an instance StructType. We convert each int field to double without checking anything regarding column names. You can try to run the script line by line and see the output. If you face any issues let me know – abiratsis Nov 28 '19 at 12:24
  • ok I will need to try that first and see if I run into issues. Thanks! – addicted Nov 28 '19 at 12:28
  • @AlexandrosBiratsis, this will fail if any field-names contain special chars like dot, space etc. which needs to be enclosed by backticks. this is one thing we have to consider when using DDL-formatted simpleString. – jxc Nov 28 '19 at 14:00
  • @jxc can you provide an example? I can't think of any case like that. Although I think addicted was pretty much specific about the names and it seems that the above way to use replace it should work for his case – abiratsis Nov 28 '19 at 16:43
  • 1
    @AlexandrosBiratsis: `_parse_datatype_string("struct")`, while you need `_parse_datatype_string("struct<\`my.col\`:string,b1:int>")`. notice the dot in the field-name and the back-ticks which wont be added by `df.schema.simpleString()`. – jxc Nov 28 '19 at 16:50
  • @AlexandrosBiratsis, a more robust way is to use re.sub() to adjust the simpleString, so we can add the backtick and meanwhile map the field dtype. The same issue also to `schema_of_json()` which also returns DDL-formatted string without the backticks. However if you are sure that none field names contain special chars like dot, spaces, hyphen etc, then `str.replace()` should be enough. – jxc Nov 28 '19 at 17:10
  • 1
    I agree @jxc, to be more correct I would prefer to implement a recursive solution. – abiratsis Nov 28 '19 at 17:26
  • @addicted I updated the answer with a new function which is processing the elements of the schema recursively – abiratsis Dec 10 '19 at 08:05
  • Hi @addicted, did the above solution help finally? – abiratsis Jan 31 '20 at 12:23
  • 1
    Hi @alexandro sorry havent replied again. I will try within today or tomorrow and let you know. – addicted Jan 31 '20 at 13:03
  • No problem @addicted I was just wondering if this is what you were trying to implement. I updated the answer with an example about how to use it on your existing code – abiratsis Feb 03 '20 at 15:30