0

I have a dataframe so assume my data is in Tabular format.

|ID   |       Serial               |    Updated
-------------------------------------------------------
|10   |pers1                       |                  |
|20   |                            |                  |
|30   |entity_1, entity_2, entity_3|entity_1, entity_3|

Now using withColumn("Serial", explode(split(",")"Serial"))). I have achieved breaking columns into multiple rows as below. this was the 1st part of the requirement.

   |ID   |       Serial    |    Updated
    -------------------------------------------------------
    |10   |pers1           |                  |
    |20   |                |                  |
    |30   |entity_1        |entity_1, entity_3|
    |30   |entity_2        |entity_1, entity_3|
    |30   |entity_3        |entity_1, entity_3| 

Now for the columns where there are no values it should be 0, For values which is present in 'Serial' Column should be searched in 'Updated' column. If the value is present in 'Updated' column then it should display '1' else '2'

So for here in this case for entity_1 && entity_3 --> 1 must be displayed & for entity_2 --> 2 should be displayed

How to achieve this ..?

AJm
  • 993
  • 2
  • 20
  • 39

1 Answers1

0

AFAIK, there is no way to check if one column is contained within or is a substring of another column directly without using a udf.

However, if you wanted to avoid using a udf, one way is to explode the "Updated" column. Then you can check for equality between the "Serial" column and the exploded "Updated" column and apply your conditions (1 if match, 2 otherwise)- call this "contains".

Finally, you can then groupBy("ID", "Serial", "Updated") and select the minimum of the "contains" column.

For example, after the two calls to explode() and checking your condition, you will have a DataFrame like this:

df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
    .withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
    .withColumn(
        "contains",
        f.when(
            f.isnull("Serial") | 
            f.isnull("Updated") | 
            (f.col("Serial") == "") | 
            (f.col("Updated") == ""),
            0
        ).when(
            f.col("Serial") == f.col("updatedExploded"),
            1
        ).otherwise(2)
    )\
    .show(truncate=False)
#+---+--------+-----------------+---------------+--------+
#|ID |Serial  |Updated          |updatedExploded|contains|
#+---+--------+-----------------+---------------+--------+
#|10 |pers1   |                 |               |0       |
#|20 |        |                 |               |0       |
#|30 |entity_1|entity_1,entity_3|entity_1       |1       |
#|30 |entity_1|entity_1,entity_3|entity_3       |2       |
#|30 |entity_2|entity_1,entity_3|entity_1       |2       |
#|30 |entity_2|entity_1,entity_3|entity_3       |2       |
#|30 |entity_3|entity_1,entity_3|entity_1       |2       |
#|30 |entity_3|entity_1,entity_3|entity_3       |1       |
#+---+--------+-----------------+---------------+--------+

The "trick" of grouping by ("ID", "Serial", "Updated") and taking the minimum of "contains" works because:

  • If either "Serial" or "Updated" is null (or equal to empty string in this case), the value will be 0.
  • If at least one of the values in "Updated" matches with "Serial", one of the columns will have a 1.
  • If there are no matches, you will have only 2's

The final output:

df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
    .withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
    .withColumn(
        "contains",
        f.when(
            f.isnull("Serial") |
            f.isnull("Updated") |
            (f.col("Serial") == "") |
            (f.col("Updated") == ""),
            0
        ).when(
            f.col("Serial") == f.col("updatedExploded"),
            1
        ).otherwise(2)
    )\
    .groupBy("ID", "Serial", "Updated")\
    .agg(f.min("contains").alias("contains"))\
    .sort("ID")\
    .show(truncate=False)
#+---+--------+-----------------+--------+
#|ID |Serial  |Updated          |contains|
#+---+--------+-----------------+--------+
#|10 |pers1   |                 |0       |
#|20 |        |                 |0       |
#|30 |entity_3|entity_1,entity_3|1       |
#|30 |entity_2|entity_1,entity_3|2       |
#|30 |entity_1|entity_1,entity_3|1       |
#+---+--------+-----------------+--------+

I'm chaining calls to pyspark.sql.functions.when() to check the conditions. The first part checks to see if either column is null or equal to the empty string. I believe that you probably only need to check for null in your actual data, but I put in the check for empty string based on how you displayed your example DataFrame.

pault
  • 41,343
  • 15
  • 107
  • 149