1

I have 2 dataframes.

One dataframe DF1 has a column with values that are seperated by a delimiter say comma. The second dataframe DF2 has a column with single value (this could be part of the comma seperated column values in the other dataframe, DF1). I need to iterate DF2 records/rows, and see if DF2.color exists in the comma seperated column values in DF1.csv_column, and if exists add the df1 row ID TO A NEW DATAFRAME.

df1= sqlContext.createDataFrame([("A001","RED, WHITE, BLUE"),("A002","RED, YELLOW"),("A003","GREEN,RED"),("A004","WHITE,YELLOW")], ["id","csv_column"])
df1.show()


df2= sqlContext.createDataFrame([("C1","RED"),("C2","WHITE"),("C3","BLUE"),("C4","YELLOW"),("C5","RED"),("C6","GREEN"),("C7","BLUE")], ["CLRCODE","COLOR"])
df2.show()

+----+----------------+ 
| id | csv_column     | 
+----+----------------+ 
|A001|RED, WHITE, BLUE| 
|A002|RED, YELLOW     | 
|A003|GREEN, RED      | 
|A004|WHITE, YELLOW   |
 +----+----------------+

+-------+-------+ 
|CLRCODE| COLOR | 
+-------+-------+ 
| C1    | RED   | 
| C2    | WHITE | 
| C3    | BLUE  | 
| C4    | YELLOW| 
| C5    | RED   | 
| C6    | GREEN | 
| C7    | BLUE  | 
+-------+-------+

Expected Result: The column csv_column in df1 has RED,WHITE,BLUE and so I have add the IDs for RED,WHITE,BLUE as a row to the new dataframe, and so on. Please note CLRCODE in DF2, is just a place holder and is not used hence. How to get this result.

+-------+
|df1.id |
+-------+
|A001   |
|A002   |
|A003   |
|A001   |
|A004   |
|A001   |
|A002   |
|A004   |
|A001   |
|A002   |
|A003   |
|A003   |
|A001   |
+-------+

I checked in this SO solution here, but there the dataframe comma seperated column is validated against a static string value, but I need to iterate through a dataframe which contains many rows of various values.

Thanks for the help.

Ben.T
  • 29,160
  • 6
  • 32
  • 54
Yuva
  • 2,831
  • 7
  • 36
  • 60
  • I prefer Pyspark. Since am using databricks so I believe pyspark would be fine. If not pyspark, then yes pandas will do. Thanks Ben. – Yuva May 23 '19 at 17:41

1 Answers1

0

You can first split and explode the column df1.csv_column and after join on df2, you can groupBy and concat_ws. for example, you can do:

import pyspark.sql.functions as F
df_res = (df2.join( df1.withColumn('color', F.explode(F.split('csv_column',',\s*'))),
                    on='color', how='left')
              .groupby("CLRCODE").agg(F.concat_ws(", ", F.collect_list(F.col('id'))).alias('id'))
              .orderBy('CLRCODE').drop('CLRCODE'))
df_res.show()
+----------------+
|              id|
+----------------+
|A001, A002, A003|
|      A001, A004|
|            A001|
|      A002, A004|
|A001, A002, A003|
|            A003|
|            A001|
+----------------+

EDIT: for the edited expected output, you need the split, explode and join part. Then orderBy and select the column you want:

import pyspark.sql.functions as F
df_res = (df2.join( df1.withColumn('color', F.explode(F.split('csv_column',',\s*'))),
                    on='color', how='left').orderBy(*['CLRCODE', 'id']).select('id'))
df_res.show()
+----+
|  id|
+----+
|A001|
|A002|
|A003|
|A001|
|A004|
|A001|
|A002|
|A004|
|A001|
|A002|
|A003|
|A003|
|A001|
+----+
Ben.T
  • 29,160
  • 6
  • 32
  • 54
  • Thank you Ben, sorry if my result set is misrepresented. I just grouped it for better clarity. My requirement is one id per row. Will modify the question. Can you please add your new code in addition to the provided solution. This would be great reference to other members. – Yuva May 23 '19 at 18:09
  • @Yuva you are right it was clear the way you did, I just misunderstood :) – Ben.T May 23 '19 at 18:25
  • 1
    Thanks Ben, have accepted your answer. Will try this. – Yuva May 24 '19 at 00:44