AFAIK, there is no way to check if one column is contained within or is a substring of another column directly without using a udf
.
However, if you wanted to avoid using a udf
, one way is to explode the "Updated"
column. Then you can check for equality between the "Serial"
column and the exploded "Updated"
column and apply your conditions (1 if match, 2 otherwise)- call this "contains"
.
Finally, you can then groupBy("ID", "Serial", "Updated")
and select the minimum of the "contains"
column.
For example, after the two calls to explode()
and checking your condition, you will have a DataFrame like this:
df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
.withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
.withColumn(
"contains",
f.when(
f.isnull("Serial") |
f.isnull("Updated") |
(f.col("Serial") == "") |
(f.col("Updated") == ""),
0
).when(
f.col("Serial") == f.col("updatedExploded"),
1
).otherwise(2)
)\
.show(truncate=False)
#+---+--------+-----------------+---------------+--------+
#|ID |Serial |Updated |updatedExploded|contains|
#+---+--------+-----------------+---------------+--------+
#|10 |pers1 | | |0 |
#|20 | | | |0 |
#|30 |entity_1|entity_1,entity_3|entity_1 |1 |
#|30 |entity_1|entity_1,entity_3|entity_3 |2 |
#|30 |entity_2|entity_1,entity_3|entity_1 |2 |
#|30 |entity_2|entity_1,entity_3|entity_3 |2 |
#|30 |entity_3|entity_1,entity_3|entity_1 |2 |
#|30 |entity_3|entity_1,entity_3|entity_3 |1 |
#+---+--------+-----------------+---------------+--------+
The "trick" of grouping by ("ID", "Serial", "Updated")
and taking the minimum of "contains"
works because:
- If either
"Serial"
or "Updated"
is null (or equal to empty string in this case), the value will be 0.
- If at least one of the values in
"Updated"
matches with "Serial"
, one of the columns will have a 1.
- If there are no matches, you will have only 2's
The final output:
df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
.withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
.withColumn(
"contains",
f.when(
f.isnull("Serial") |
f.isnull("Updated") |
(f.col("Serial") == "") |
(f.col("Updated") == ""),
0
).when(
f.col("Serial") == f.col("updatedExploded"),
1
).otherwise(2)
)\
.groupBy("ID", "Serial", "Updated")\
.agg(f.min("contains").alias("contains"))\
.sort("ID")\
.show(truncate=False)
#+---+--------+-----------------+--------+
#|ID |Serial |Updated |contains|
#+---+--------+-----------------+--------+
#|10 |pers1 | |0 |
#|20 | | |0 |
#|30 |entity_3|entity_1,entity_3|1 |
#|30 |entity_2|entity_1,entity_3|2 |
#|30 |entity_1|entity_1,entity_3|1 |
#+---+--------+-----------------+--------+
I'm chaining calls to pyspark.sql.functions.when()
to check the conditions. The first part checks to see if either column is null
or equal to the empty string. I believe that you probably only need to check for null
in your actual data, but I put in the check for empty string based on how you displayed your example DataFrame.