6

I have a column called event_data in json format in my spark DataFrame, after reading it using from_json, I get this schema:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)

I only need af_content_id from this column. This attribute can be of different formats:

  • a String
  • an Integer
  • a List of Int and Str. eg ['ghhjj23','123546',12356]
  • None (sometimes event_data doesn't contain af_content_id)

    I want to use explode function in order to return a new row for each element in af_content_id when it is of format List. But as when I apply it, I get an error:

    from pyspark.sql.functions import explode
    
    def get_content_id(column):
        return column.af_content_id
    
    df_transf_1 = df_transf_1.withColumn(
        "products_basket", 
        get_content_id(df_transf_1.event_data)
    )
    
    df_transf_1 = df_transf_1.withColumn(
        "product_id",
        explode(df_transf_1.products_basket)
    )
    

    cannot resolve 'explode(products_basket)' due to data type mismatch: input to function explode should be array or map type, not StringType;

    I know the reason, it's because of the different types that the field af_content_id may contain, but I don't know how to resolve it. Using pyspark.sql.functions.array() directly on the column doesn't work because it become array of array and explode will not produce the expected result.

    A sample code to reproduce the step that I'm stuck on:

    import pandas as pd
    
    arr = [
        ['b5ad805c-f295-4852-82fc-961a88',12732936],
        ['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
        ['0E3D17EA-BEEF-4931-8104','12909841'],
        ['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
    ]
    
    df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
    
    df = df[['user_id','products_basket']].astype(str)
    df_transf_1 = spark.createDataFrame(df)
    

    I'm looking for a way to convert products_basket to one only possible format: an Array so that when I apply explode, it will contain one id per row.

  • pault
    • 41,343
    • 15
    • 107
    • 149
    SarahData
    • 769
    • 1
    • 12
    • 38
    • The reason is that `af_content_id1` is a `StringType`, which can't be exploded. It may *look* like sometimes a list and sometimes a string, but it's actually always a string. There may be workarounds for this- please provide a small [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) with your desired output. – pault Nov 27 '18 at 16:18

    1 Answers1

    5

    If you are starting with a DataFrame like:

    df_transf_1.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |12732936                      |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |['Gklb38', '123655']          |
    #|0E3D17EA-BEEF-4931-8104         |12909841                      |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    

    where the products_basket column is a StringType:

    df.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: string (nullable = true)
    

    You can't call explode on products_basket because it's not an array or map.

    One workaround is to remove any leading/trailing square brackets and then split the string on ", " (comma followed by a space). This will convert the string into an array of strings.

    from pyspark.sql.functions import col, regexp_replace, split
    df_transf_new= df_transf_1.withColumn(
        "products_basket",
        split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
    )
    
    df_transf_new.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    

    The regular expression pattern matches any of the following:

    • (^\[): An opening square bracket at the start of the string
    • (\]$): A closing square bracket at the end of the string
    • ('): Any single quote (because your strings are quoted)

    and replaces these with an empty string.

    This assumes that your data does not contain any needed single quotes or square brackets inside the product_basket.

    After the split, the schema of the new DataFrame is:

    df_transf_new.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: array (nullable = true)
    # |    |-- element: string (containsNull = true)
    

    Now you can call explode:

    from pyspark.sql.functions import explode
    df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
    #+--------------------------------+------------------------------+----------+
    #|user_id                         |products_basket               |product_id|
    #+--------------------------------+------------------------------+----------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |12732936  |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |Gklb38    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |123655    |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |12909841  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #+--------------------------------+------------------------------+----------+
    
    pault
    • 41,343
    • 15
    • 107
    • 149
    • I tried to do it as follow: `df_transf_2 = df_transf_1.withColumn("products_list", regexp_replace('products_basket','\[|\]',''))` `df_transf_2 = df_transf_2.withColumn("products_list", array("products_list"))` and then used `explode("products_list")` but it didn't work... why applying `array()` in this case does not work and should use `split("products_list",", ")` instead? Note that I got the same schema that you got before explode.. – SarahData Nov 27 '18 at 18:05
    • 1
      you have to call `split()` after the `regexp_replace`. Calling `array` is not appropriate here. – pault Nov 27 '18 at 18:22
    • Yes I know it's not appropriate since it didn't lead to the expected output, but why? I supposed that after making all elements of `products_basket` in the format of `'ghhjj23','123546',12356` when I apply array, it will add the brackets and explode will return new row for each element. – SarahData Nov 27 '18 at 18:37
    • 2
      If you apply `array()` to a string, it will become an array with one element (the string). The `array()` function has no knowledge that the commas should be used as delimiters. If you call split, it will split the string into multiple elements and return an array. Does that make sense? – pault Nov 27 '18 at 18:46
    • 2
      One potentially confusing aspect is that `array("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""))` and `split("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""), " ,")` will both *print* the same way to the console if you call `show()`, but the underlying data is different. The latter is what you want and the former is just an array with one string. – pault Nov 27 '18 at 18:54
    • I applied your proposed solution to the dataframe (not sample) and it didn't work, produced explode as it produce when I applied `array()` means treating as one element all the array.. and after the split, `show()` gives an output without the space after comma.. trying to understand why. – SarahData Nov 27 '18 at 20:43
    • 2
      @SarahData maybe that's because I split on `", "` when you want to just split on `","` (no space). It's hard to say without seeing your actual data, but it seems like the string is not being split because the pattern is not matching. You can also pass in regex patterns to split like `",(\s+)?"` which means a comma followed by optional whitespace. – pault Nov 27 '18 at 20:48
    • 1
      yes, you're right, I went back again to split() function params and verified what it takes as input and the column previous step (after applying only `regexp_replace`) and understood that it should be `","` not `", "`. Thanks! – SarahData Nov 27 '18 at 21:05