Based on your sample, you can convert the String into Map using SparkSQL function str_to_map and then select values from the desired map keys(below code assumed the StringType column name is value
):
from pyspark.sql import functions as F
keys = ['Type', 'Model', 'ID', 'conn seq']
df.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
.select("*", *[ F.col('m')[k].alias(k) for k in keys ]) \
.show()
+--------------------+--------------------+---------+-----+---+--------+
| value| m| Type|Model| ID|conn seq|
+--------------------+--------------------+---------+-----+---+--------+
|Type=<Series VR> ...|[Type -> Series V...|Series VR| 1Ac4| 34| 2|
|Type=<SeriesX> Mo...|[Type -> SeriesX,...| SeriesX| 12Q3|231| 3423123|
+--------------------+--------------------+---------+-----+---+--------+
Notes: Here we use the regex pattern > *
to split pairs and pattern =<
to split key/value. Check this link if keys
of the Map are dynamic and not able to predefined, just make sure to filter out the EMPTY key.
Edit: Based on comments, to do case-insensitive search on map keys. for Spark 2.3, we can use pandas_udf to preprocess the value
column before using str_to_map function:
setup the regex pattern for matched keys(in capturing group-1). here we use (?i)
to set up case-insensitive match, and add two anchors \b
and (?==)
, so that the matched sub-strings must have a word boundary to the left and followed by an =
mark to the right.
ptn = "(?i)\\b({})(?==)".format('|'.join(keys))
print(ptn)
#(?i)\b(Type|Model|ID|conn seq)(?==)
set up pandas_udf so we can use Series.str.replace() and set a callback(lowercase $1) as replacement:
lower_keys = F.pandas_udf(lambda s: s.str.replace(ptn, lambda m: m.group(1).lower()), "string")
convert all matched keys to lowercase:
df1 = df.withColumn('value', lower_keys('value'))
+-------------------------------------------------------+
|value |
+-------------------------------------------------------+
|type=<Series VR> model=<1Ac4> id=<34> conn seq=<2> |
|type=<SeriesX> model=<12Q3> id=<231> conn seq=<3423123>|
+-------------------------------------------------------+
use str_to_map to create map, and then use k.lower()
as keys to find their corresponding values.
df1.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
.select("*", *[ F.col('m')[k.lower()].alias(k) for k in keys ]) \
.show()
Note: in case you can use Spark 3.0+ in the future, skip the above steps and use transform_keys function instead:
df.withColumn("m", F.expr("str_to_map(value, '> *', '=<')")) \
.withColumn("m", F.expr("transform_keys(m, (k,v) -> lower(k))")) \
.select("*", *[ F.col('m')[k.lower()].alias(k) for k in keys ]) \
.show()
For Spark 2.4+, replace transform_keys(...)
with the following:
map_from_entries(transform(map_keys(m), k -> (lower(k), m[k])))