1

Need help in parsing a string, where it contains values for each attribute. below is my sample string...

Type=<Series VR> Model=<1Ac4> ID=<34> conn seq=<2>
Type=<SeriesX> Model=<12Q3> ID=<231> conn seq=<3423123>

from the above, I have to generate the columns with values as below.

Type | Model | Id | conn seq
----------------------------
Series VR | 1Ac4 | 34 | 2
SeriesX | 12Q3 | 231 | 3423123

not sure of how to parse it by regex/split and using withColumn().

help is appreciated. thanks

marc
  • 319
  • 1
  • 5
  • 20

1 Answers1

2

Based on your sample, you can convert the String into Map using SparkSQL function str_to_map and then select values from the desired map keys(below code assumed the StringType column name is value):

from pyspark.sql import functions as F

keys = ['Type', 'Model', 'ID', 'conn seq']

df.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
    .select("*", *[ F.col('m')[k].alias(k) for k in keys ]) \
    .show()
+--------------------+--------------------+---------+-----+---+--------+
|               value|                   m|     Type|Model| ID|conn seq|
+--------------------+--------------------+---------+-----+---+--------+
|Type=<Series VR> ...|[Type -> Series V...|Series VR| 1Ac4| 34|       2|
|Type=<SeriesX> Mo...|[Type -> SeriesX,...|  SeriesX| 12Q3|231| 3423123|
+--------------------+--------------------+---------+-----+---+--------+

Notes: Here we use the regex pattern > * to split pairs and pattern =< to split key/value. Check this link if keys of the Map are dynamic and not able to predefined, just make sure to filter out the EMPTY key.

Edit: Based on comments, to do case-insensitive search on map keys. for Spark 2.3, we can use pandas_udf to preprocess the value column before using str_to_map function:

  1. setup the regex pattern for matched keys(in capturing group-1). here we use (?i) to set up case-insensitive match, and add two anchors \b and (?==), so that the matched sub-strings must have a word boundary to the left and followed by an = mark to the right.

    ptn = "(?i)\\b({})(?==)".format('|'.join(keys))
    print(ptn)
    #(?i)\b(Type|Model|ID|conn seq)(?==)
    
  2. set up pandas_udf so we can use Series.str.replace() and set a callback(lowercase $1) as replacement:

    lower_keys = F.pandas_udf(lambda s: s.str.replace(ptn, lambda m: m.group(1).lower()), "string")
    
  3. convert all matched keys to lowercase:

    df1 = df.withColumn('value', lower_keys('value'))
    +-------------------------------------------------------+
    |value                                                  |
    +-------------------------------------------------------+
    |type=<Series VR> model=<1Ac4> id=<34> conn seq=<2>     |
    |type=<SeriesX> model=<12Q3> id=<231> conn seq=<3423123>|
    +-------------------------------------------------------+
    
  4. use str_to_map to create map, and then use k.lower() as keys to find their corresponding values.

    df1.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
        .select("*", *[ F.col('m')[k.lower()].alias(k) for k in keys ]) \
        .show()
    

Note: in case you can use Spark 3.0+ in the future, skip the above steps and use transform_keys function instead:

df.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
    .withColumn("m", F.expr("transform_keys(m, (k,v) -> lower(k))")) \
    .select("*", *[ F.col('m')[k.lower()].alias(k) for k in keys ]) \
    .show()

For Spark 2.4+, replace transform_keys(...) with the following:

map_from_entries(transform(map_keys(m), k -> (lower(k), m[k])))
jxc
  • 13,553
  • 4
  • 16
  • 34
  • along with that String, there are other columns in the DF. how to fetch those as well along with these new columns from the map. thanks – marc Oct 22 '20 at 23:31
  • i tried that, except I want to check whether to keep * in the second select without specifying all the column names. – marc Oct 22 '20 at 23:40
  • gotcha. i realizes, missed the '*' infront of the list . it worked. thanks for your quick help.:) @jxc – marc Oct 22 '20 at 23:45
  • HI @jxc.. quick help. how to do the case insensitive search for the keys... like making lower or upper on the map keys. thanks – marc Nov 03 '20 at 19:50
  • @marc, for Spark 3.0+, we can use `transform_keys` to convert all map keys to lower-case, we can do the similar to Spark 2.4+. what is your spark version? – jxc Nov 03 '20 at 20:04
  • using 2.3 currently :(. can you let me know on how to do it in 2.3 or integrating the transform_keys in the above solution for 2.4. thanks – marc Nov 03 '20 at 21:16
  • for spark 2.3, the map/array functions are limited, but I think we can use pandas_udf, let me do some testing and then update the post. – jxc Nov 03 '20 at 21:18
  • upgraded my local to 2.4. can you help me out on how to do it – marc Nov 03 '20 at 22:53
  • @marc, check the end of my updated post, just post-process the map, say `.withColumn("m", F.expr("map_from_entries(transform(map_keys(m), k -> (lower(k), m[k])))"))` – jxc Nov 03 '20 at 22:56
  • n/p, just let me know for any issues. – jxc Nov 03 '20 at 23:03
  • Hi need quick help. my spark is 2.3 and when trying your code, getting 'PyArrow >=0.8.0' needs to be installed and it's out of my scope to pip it, as I am not sudo. i am trying to use map_keys solution, and getting 'extraneous input '>' expecting .... ==\ntransform_keys(m, (k,v) -> lower(k))' error.. is this due to syntax or needed 2.4 version ? @jxc – marc Nov 04 '20 at 22:05
  • then try using udf: `lower_keys = F.udf(lambda x: re.sub(ptn, lambda m: m.group(1).lower(),x) if x else x, "string")`, make sure to `import re` and pre-compile the regex pattern using: `ptn = re.compile(r"(?i)\\b({})(?==)".format('|'.join(keys)))`.. as mentioned in the post, transform_keys requires Spark 3.0+. – jxc Nov 04 '20 at 22:41
  • Hi @jxc. sorry to bother. how to extract the same string wich have only '=' , instead of '=<>' . my input string is " ...... Type=Series VRsomeTest Model=1Ac4 ID=4 sometesthere" . I need to have columns Type, Model, ID. any help – marc Nov 05 '20 at 04:06
  • Hi, @marc, you can adjust the pariDelim and keyValueDelim in str_to_map function, for example: `df.withColumn("m", F.expr("str_to_map(value,'(?i) +(?=type|model|id|conn seq)','=')"))`. where we use lookahead anchor `(?=..)` to set pairDelim to SPACE only followed by the keys. you can dynamically generate this using python String-format method, say: `F.expr("str_to_map(value,'(?i) +(?={})','=')".format('|'.join(keys)))`. notice that I removed the non-capturing group from the regex pattern in the code of my deleted comment. – jxc Nov 05 '20 at 04:41
  • Hi @jxc, things are getting little tricky on my end. in the value column, sometimes I will be getting data with =<>, like key= and sometimes it will be without <> , like key=value. – marc Nov 05 '20 at 05:29
  • @marc, can you add a new question with sample texts? it's a little too late right now and I will have to check this tomorrow. – jxc Nov 05 '20 at 05:33
  • sure. thanks for all the help. you went beyond the way to help me out. really appreciate it. good night. @jxc – marc Nov 05 '20 at 05:43
  • Hello @jxc. created a new question. can you please take a look if possible?. thanks.https://stackoverflow.com/questions/64704020/spark-extract-columns-from-string – marc Nov 05 '20 at 19:25