0

I am trying to transform a dataframe with a column having the following string with structure:

{

Key = [Value 1, Value 2, Value 3, Value 4, Value 1, Value 2, Value 3, Value 4 ...],

Key = [Value 1, Value 2, Value 3, Value 4, Value 1, Value 2, Value 3, Value 4 ...],

}

That is represented with values as below:

{

100=[800,0,100,0,2.168675, 800,0,100,0,.4954798], 

400=[800,0,400,0,3.987227, 800,0,400,0,.7282956], 

4000=[3200,0,4000,0,3.112903, 3200,0,4000,0,1.850587]

}

How to transform the above string into the below dataframe with rows exploded?

enter image description here

Here is the data in the data frame with the schema:

enter image description here

After applying the suggested solution, Unfortunately, the regex expression starting with r'.... gave an error saying "R Literals are not supported"..

Thus, I have replaced them accordingly to make it work only using the following code.

The only issue left is to remove the [ character from the Value1 Column.

If you have suggestions, appreciate it. I will select the suggested solution as accepted... Many Thanks...

import pyspark.sql.functions as F
from pyspark.sql.functions import *

data_source = "/curated/SensorMEDCurated"
df = read_delta_from_datalake (global_definitions["curated_dl_zone"], data_source)

print("Original Data:")
df.select("rms").show(1, truncate = False)

extract_all_pattern = "\'(" + "\\\\d+=\\\\[[^\\]]+\\]" + ")\'" ## \d+=\[[^\]] +\]  ==> Any Integer = Any values in a bracket

df = df.withColumn("rms", F.expr(f"regexp_extract_all(rms, {extract_all_pattern}, 1)")) \
    .withColumn("rms", F.explode("rms"))

df =df.withColumn("outer_value", split(df["rms"], '=').getItem(0))
df =df.withColumn("values", split(df["rms"], '=').getItem(1))
df= df.withColumn("values", F.split("values", "[\\\\s*,\\\\s*]"))

df = df.select(["outer_value"] + [F.element_at("values", i).alias(f"value{i}") for i in range(1,16)])

df.show()

Results: enter image description here

Text output:

{100=[800,0,100,0,2.168675, 800,0,100,0,.4954798, 160,0,20,0,.4119049, 48,20,26,0,.1014838, 96,26,38,0,.1790891, 496,38,100,0,.1671498], 400=[800,0,400,0,3.987227, 800,0,400,0,.7282956, 210,0,105,0,.5492065, 590,105,400,0,.4716012], 4000=[3200,0,4000,0,3.112903, 3200,0,4000,0,1.850587, 82,0,102,0,.5790547, 17,102,123,0,.1790891, 408,123,633,0,.6745689, 62,633,710,0,.1910284, 405,710,1216,0,.4178745, 202,1216,1468,0,.2387854, 330,1468,1880,0,.2507247, 758,1880,2828,0,1.361077, 936,2828,3998,0,.6089029]}

Cengiz
  • 303
  • 2
  • 9
  • There are multiple ways to do this. However, its important we know how it exactly appears in a dataframe before transformation. Are you able to post a sample input dataframe? – wwnde Dec 10 '22 at 02:57
  • Please post output of `df.select("column-name").limit(1)` and `df.printSchema()`. – Azhar Khan Dec 10 '22 at 06:08
  • Details are added above as requested. thanks.. – Cengiz Dec 10 '22 at 10:16

1 Answers1

1

UPDATE

Updated the solution to work with newly provided input text:

df = spark.createDataFrame(
    data=[["{100=[ 800,0,100,0,2.168675, 800,0 ,100,0,.4954798, 160 ,0,20,0,.4119049, 48,20,26,0,.1014838, 96,26,38,0,.1790891, 496,38,100,0,.1671498 ], 400=[800,0,400,0,3.987227, 800,0,400,0,.7282956, 210,0,105,0,.5492065, 590,105,400,0,.4716012 ], 4000=[3200,0,4000,0,3.112903, 3200,0,4000,0,1.850587, 82,0,102,0,.5790547, 17,102,123,0,.1790891, 408,123,633,0,.6745689, 62,633,710,0,.1910284, 405,710,1216,0,.4178745, 202,1216,1468,0,.2387854, 330,1468,1880,0,.2507247, 758,1880,2828,0,1.361077, 936,2828,3998,0,.6089029]}"]], 
    schema=["column_str"]
)

import pyspark.sql.functions as F

df = df.withColumn("column_str", F.regexp_replace("column_str", r"\s", "")) \
       .withColumn("column_str", F.expr("regexp_extract_all(column_str, r'(\d+=\[[^\]]+\])', 1)")) \
       .withColumn("column_str", F.explode("column_str")) \
       .withColumn("outer_value", F.regexp_extract("column_str", r"(\d+)=\[[^\]]+\]", 1)) \
       .withColumn("values", F.regexp_extract("column_str", r"\d+=\[([^\]]+)\]", 1)) \
       .withColumn("values", F.split("values", r"\s*,\s*")) \
       .withColumn("values", F.transform("values", lambda x, i: F.create_map(F.concat(F.lit("value"), F.lpad(i + 1, 2, "0")), x))) \
       .withColumn("ID", F.monotonically_increasing_id()) \
       .withColumn("values", F.explode("values")) \
       .select("ID", "outer_value", F.explode("values")) \
       .groupBy("ID", "outer_value") \
       .pivot("key") \
       .agg(F.first("value")) \
       .drop("ID")

df.show(truncate=False)

+-----------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+
|outer_value|value01|value02|value03|value04|value05 |value06|value07|value08|value09|value10 |value11|value12|value13|value14|value15 |value16|value17|value18|value19|value20 |value21|value22|value23|value24|value25 |value26|value27|value28|value29|value30 |value31|value32|value33|value34|value35 |value36|value37|value38|value39|value40 |value41|value42|value43|value44|value45 |value46|value47|value48|value49|value50 |value51|value52|value53|value54|value55 |
+-----------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+
|100        |800    |0      |100    |0      |2.168675|800    |0      |100    |0      |.4954798|160    |0      |20     |0      |.4119049|48     |20     |26     |0      |.1014838|96     |26     |38     |0      |.1790891|496    |38     |100    |0      |.1671498|null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |
|400        |800    |0      |400    |0      |3.987227|800    |0      |400    |0      |.7282956|210    |0      |105    |0      |.5492065|590    |105    |400    |0      |.4716012|null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |null   |null   |null   |null   |null    |
|4000       |3200   |0      |4000   |0      |3.112903|3200   |0      |4000   |0      |1.850587|82     |0      |102    |0      |.5790547|17     |102    |123    |0      |.1790891|408    |123    |633    |0      |.6745689|62     |633    |710    |0      |.1910284|405    |710    |1216   |0      |.4178745|202    |1216   |1468   |0      |.2387854|330    |1468   |1880   |0      |.2507247|758    |1880   |2828   |0      |1.361077|936    |2828   |3998   |0      |.6089029|
+-----------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+-------+-------+-------+-------+--------+


The logic is:

  • Use regex to split string in form "outer_value=[value1, value2, ...]".
  • Use regex to extract "outer_value".
  • Use regex to extract array "value1, value2, ...".
  • Split values by ,.
  • Select required value columns by index.
df = spark.createDataFrame(
    data=[[
        """{

        100=[800,0,100,0,2.168675, 800,0,100,0,.4954798],

        400=[800,0,400,0,3.987227, 800,0,400,0,.7282956],

        4000=[3200,0,4000,0,3.112903, 3200,0,4000,0,1.850587]

        }"""       
    ]], 
    schema=["column_str"]
)

import pyspark.sql.functions as F

df = df.withColumn("column_str", F.expr("regexp_extract_all(column_str, r'(\d+=\[[^\]]+\])', 1)")) \
       .withColumn("column_str", F.explode("column_str")) \
       .withColumn("outer_value", F.regexp_extract("column_str", r"(\d+)=\[[^\]]+\]", 1)) \
       .withColumn("values", F.regexp_extract("column_str", r"\d+=\[([^\]]+)\]", 1)) \
       .withColumn("values", F.split("values", r"\s*,\s*")) \
       .select(["outer_value"] + [F.element_at("values", i).alias(f"value{i}") for i in range(1,11)])

df.show()

+-----------+------+------+------+------+--------+------+------+------+------+--------+
|outer_value|value1|value2|value3|value4|  value5|value6|value7|value8|value9| value10|
+-----------+------+------+------+------+--------+------+------+------+------+--------+
|        100|   800|     0|   100|     0|2.168675|   800|     0|   100|     0|.4954798|
|        400|   800|     0|   400|     0|3.987227|   800|     0|   400|     0|.7282956|
|       4000|  3200|     0|  4000|     0|3.112903|  3200|     0|  4000|     0|1.850587|
+-----------+------+------+------+------+--------+------+------+------+------+--------+
Azhar Khan
  • 3,829
  • 11
  • 26
  • 32
  • After applying the suggested solution, Unfortunately, the regex expression starting with r'.... gave an error saying "R Literals are not supported".. Thus, I have replaced them accordingly to make it work only using the following code. The only issue left is to remove the [ character from the Value1 Column. If you have suggestions, appreciate it. I will select the suggested solution as accepted... Many Thanks... – Cengiz Dec 12 '22 at 15:05
  • Please share a sample code to create your dataframe. I did not have it so I used the string representation from question. – Azhar Khan Dec 12 '22 at 15:18
  • Or post output of `df.select("rms").limit(1)` as text. Preferably the string for which the solution is not working. – Azhar Khan Dec 12 '22 at 15:26
  • Please share as text. It is hard to use input from image. – Azhar Khan Dec 12 '22 at 15:34
  • Here is the text output. Thanks... {100=[800,0,100,0,2.168675, 800,0,100,0,.4954798, 160,0,20,0,.4119049, 48,20,26,0,.1014838, 96,26,38,0,.1790891, 496,38,100,0,.1671498], 400=[800,0,400,0,3.987227, 800,0,400,0,.7282956, 210,0,105,0,.5492065, 590,105,400,0,.4716012], 4000=[3200,0,4000,0,3.112903, 3200,0,4000,0,1.850587, 82,0,102,0,.5790547, 17,102,123,0,.1790891, 408,123,633,0,.6745689, 62,633,710,0,.1910284, 405,710,1216,0,.4178745, 202,1216,1468,0,.2387854, 330,1468,1880,0,.2507247, 758,1880,2828,0,1.361077, 936,2828,3998,0,.6089029]} – Cengiz Dec 12 '22 at 21:06
  • Updated the solution to work with newly provided input text. Let me know if it works. In your code, I saw that you have lots of escaped backward slashes. It is very difficult to read, and not clear why it is done like that. – Azhar Khan Dec 13 '22 at 11:13
  • Thanks a lot. The reason that I am replacing the 'r with \\\\ is this: https://stackoverflow.com/questions/72993096/regexp-extract-pyspark-sql-parseexception-literals-of-type-r-are-currently-no/72993995 I am getting this error: ParseException Traceback (most recent call last) df = df.withColumn("column_str", F.regexp_replace("column_str", r"\s", "")) .withColumn("column_str", F.expr("regexp_extract_all(column_str, ParseException: Literals of type 'R' are currently not supported.(line 1, pos 31) – Cengiz Dec 13 '22 at 12:54
  • Is the updated solution working now? – Azhar Khan Dec 13 '22 at 13:03
  • Still giving the same error because of the 'r used for regex. I will replace them with \\\\ to try... – Cengiz Dec 13 '22 at 14:50
  • it all worked now, after replacing withColumn("column_str", F.expr("regexp_extract_all(column_str, r'(\d+=\[[^\]]+\])', 1)")) \ with .withColumn("column_str", F.expr("regexp_extract_all(column_str, '(\\\\d+=\\\\[[^\\]]+\\])', 1)")) \ many thanks for making this happen. – Cengiz Dec 13 '22 at 15:49