2

I have a pyspark dataframe:

Example:

text <String>                 |   name <String>  |   original_name <String>
----------------------------------------------------------------------------
HELLOWORLD2019THISISGOOGLE    |   WORLD2019      |   WORLD_2019
----------------------------------------------------------------------------
NATUREISVERYGOODFORHEALTH     |   null           |   null  
----------------------------------------------------------------------------
THESUNCONTAINVITAMIND         |   VITAMIND       |   VITAMIN_D
----------------------------------------------------------------------------
BECARETOOURHEALTHISVITAMIND   |   OURHEALTH      |   OUR_/HEALTH
----------------------------------------------------------------------------

I want to loop the name column and look if name values exists in text, if yes, I create a new_column, will be contain the original_name value of the name values found in text. Knowing that some times the dataframe columns are null.

Example:

  • in the line 4 in the dataframe example, the text contain 2 values from name column: [OURHEALTH, VITAMIND], I should take its original_name values and store them in a new_column.

  • in the line 2, the text contain OURHEALTH from name column, I should store in the new_column the original name value that found ==> [OUR_/HEALTH]

Expect result:

text <String>                 |   name <String>  |   original_name <String>  | new_column <Array>
------------------------------|------------------|---------------------------|----------------------------
HELLOWORLD2019THISISGOOGLE    |   WORLD2019      |   WORLD_2019              |  [WORLD_2019]
------------------------------|------------------|---------------------------|----------------------------
NATUREISVERYGOODFOROURHEALTH  |   null           |   null                    |  [OUR_/HEALTH]
------------------------------|------------------|---------------------------|----------------------------
THESUNCONTAINVITAMIND         |   VITAMIND       |   VITAMIN_D               |  [VITAMIN_D]
------------------------------|------------------|---------------------------|----------------------------
BECARETOOURHEALTHISVITAMIND   |   OURHEALTH      |   OUR_/HEALTH             |  [OUR_/HEALTH, VITAMIN_D ]
-----------------------------------------------------------------------------|----------------------------

I hope that I was clear in my explanation.

I tried by the following code:

df = df.select("text", "name", "original_name").agg(collect_set("name").alias("name_array"))
for name_item in name_array:    
    df.withColumn("new_column", F.when(df.text.contains(name_item), "original_name").otherwise(None))

Someone can help me please ? Thank you

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
verojoucla
  • 599
  • 2
  • 12
  • 23

1 Answers1

2

One simple solution is to use join between the original DataFrame and a derived DataFrame with just the name column. As the join condition could be satisfied by multiple rows, we'll have to groupby the original column after join.

Here is a detailed example for your input :

data = [
    ("HELLOWORLD2019THISISGOOGLE", "WORLD2019", "WORLD_2019"),
    ("NATUREISVERYGOODFOROURHEALTH", None, None),
    ("THESUNCONTAINVITAMIND", "VITAMIND", "VITAMIN_D"),
    ("BECARETOOURHEALTHISVITAMIND", "OURHEALTH", "OUR_ / HEALTH")
]
df = spark.createDataFrame(data, ["text", "name", "original_name"])

# create new DF with search words
# as it's the originl_name which interests us for the final list so we select it too
search_df = df.select(struct(col("name"), col("original_name")).alias("search_match"))

# join on df.text contains search_df.name
df_join = df.join(search_df, df.text.contains(search_df["search_match.name"]), "left")

# group by original columns and collect matches in a list
df_join.groupBy("text", "name", "original_name")\
    .agg(collect_list(col("search_match.original_name")).alias("new_column"))\
    .show(truncate=False)

Output:

+----------------------------+---------+-------------+--------------------------+
|text                        |name     |original_name|new_column                |
+----------------------------+---------+-------------+--------------------------+
|HELLOWORLD2019THISISGOOGLE  |WORLD2019|WORLD_2019   |[WORLD_2019]              |
|THESUNCONTAINVITAMIND       |VITAMIND |VITAMIN_D    |[VITAMIN_D]               |
|NATUREISVERYGOODFOROURHEALTH|null     |null         |[OUR_ / HEALTH]           |
|BECARETOOURHEALTHISVITAMIND |OURHEALTH|OUR_ / HEALTH|[VITAMIN_D, OUR_ / HEALTH]|
+----------------------------+---------+-------------+--------------------------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • Thank you for your answer. it seems work well in your example. In fact, I'm testing it, are you sure that work on big data ? Because the build blocked in the first join. it run with no finish. – verojoucla Dec 09 '19 at 11:30
  • What do you mean by "_it run with no finish_"? sure that when you join dataframe on condition "_string contains another string_" it will be not performant but the logic is correct and should not block. Did you look into spark job history to see what's going on? – blackbishop Dec 09 '19 at 12:17
  • I mean that the build is still running since more 1 hour. I exterminated the jobs status, I have somes tasks are finished with succeeded, and some tasks there status change from time to time, from running to expired, then go back to running. – verojoucla Dec 09 '19 at 13:07
  • If you have large DataFrame, you need to do some optimizations because joining on string LIKE is never performant. Maybe you can try to broadcast the dataframe with keywords to seach or persist/cache it before join... see also [this](https://stackoverflow.com/questions/43938672/efficient-string-matching-in-apache-spark). – blackbishop Dec 09 '19 at 13:47
  • DO you know please if there's a difference between these two joins bellow: Test_1 ==> df_join = df1.join(F.broadcast(df2), df1.String.contains(df2["search.subString"]), "left") The running go fast, as job status, but the job took 2 hours and did not finish. Test_2 ==> df_join = F.broadcast(df1).join(F.broadcast(df2), df1.String.contains(df2["search.subString"]), "left") The running is very slow, the job still running since 2 hours also and did not finish. Thank you in advance. – verojoucla Dec 10 '19 at 14:09