1

I have 2 dataframes named - brand_name and poi_name.

Dataframe 1(brand_name):-

+-------------+
|brand_stop[0]|
+-------------+
|TOASTMASTERS |
|USBORNE      |
|ARBONNE      |
|USBORNE      |
|ARBONNE      |
|ACADEMY      |
|ARBONNE      |
|USBORNE      |
|USBORNE      |
|PILLAR       |
+-------------+

Dataframe 2:-(poi_name)

+---------------------------------------+
|Name                                   |
+---------------------------------------+
|TOASTMASTERS DISTRICT 48               |
|USBORNE BOOKS AND MORE                 |
|ARBONNE                                |
|USBORNE BOOKS AT HOME                  |
|ARBONNE                                |
|ACADEMY, LTD.                          |
|ARBONNE                                |
|USBORNE BOOKS AT HOME                  |
|USBORNE BOOKS & MORE                   |
|PILLAR TO POST HOME INSPECTION SERVICES|
+---------------------------------------+

I want to check whether the strings in brand_stop column of dataframe 1 are present in Name column of dataframe 2. The matching should be done row wise and then if there is a successful match, that particular record should be stored in a new column.

I have tried filtering the dataframe using Join:-

from pyspark.sql.functions import udf, col 
from pyspark.sql.types import BooleanType

contains = udf(lambda s, q: q in s, BooleanType())

like_with_python_udf = (poi_names.join(brand_names1)
    .where(contains(col("Name"), col("brand_stop[0]")))
    .select(col("Name")))
like_with_python_udf.show()

But this shows an error

"AnalysisException: u'Detected cartesian product for INNER join between logical plans"

I am new to PySpark. Please help me with this.

Thank you

Anubhav Sarangi
  • 179
  • 1
  • 5
  • 15
  • join should do the trick for you – Ramesh Maharjan May 23 '18 at 08:01
  • what about Case sensitivity? – Steven May 23 '18 at 08:14
  • @Steven Consider that the dataframe 1 elements are in uppercase. Please suggest the algorithm after that. – Anubhav Sarangi May 23 '18 at 08:20
  • @RameshMaharjan Join isn't possible between these 2 dataframes as there is no "id" in the 2 dataframes. Please let me know if there is any other way to join. – Anubhav Sarangi May 23 '18 at 08:21
  • In the above example, the output will be same as Dataframe 2 as all the rows match successfully. But in the original dataframe that I am working on(I mean both the dataframes there are 6000 rows - there could be some non-matching rows). In the above example, I just showed the first 10 rows of both the dataframes just to illustrate the example. So, in the output dataframe - only the matching rows between the 2 dataframes should be there. – Anubhav Sarangi May 23 '18 at 08:38
  • you want to check for contains or matching? and what if two or more rows of df1 matches with one row of df2? – Ramesh Maharjan May 23 '18 at 08:44
  • I want to check for contains. And this should be done one to one row wise mapping between df1 and df2. – Anubhav Sarangi May 23 '18 at 09:00
  • @RameshMaharjan Consider the 1st Row of dataframe 1 &2 Dataframe 2 contains TOASTMASTERS in row 1 Consider the 2nd Row of dataframe 1 & 2 Dataframe 2 contains USBORNE in row 2 & so on... – Anubhav Sarangi May 23 '18 at 09:29
  • didn't you get ideas from https://stackoverflow.com/questions/33168970/how-can-we-join-two-spark-sql-dataframes-using-a-sql-esque-like-criterion? – Ramesh Maharjan May 23 '18 at 09:38
  • @RameshMaharjan Nope I tried all the methods but none of them worked. – Anubhav Sarangi May 23 '18 at 09:41
  • @user8371915 Thanks for sharing the post. I tried all the UDFs but it didn't work. – Anubhav Sarangi May 23 '18 at 09:41
  • according to what you have explained in the comments that you want row by row comparison. so I would suggest you to generate row numbers and join them with row numbers and check for matching. Note that this is not benefitial way in distributed system – Ramesh Maharjan May 23 '18 at 10:06
  • @RameshMaharjan When I try to join them as suggested in the response below, the error shown is :- "AnalysisException: u'Detected cartesian product for INNER join between logical plans". Please help me with this. – Anubhav Sarangi May 23 '18 at 10:35
  • thats a cartesian join meaning that every row will get joined with every row of other dataframe. I don't think you want that – Ramesh Maharjan May 23 '18 at 10:38

2 Answers2

2

The scala code will be like this:

val d1 = Array(("TOASTMASTERS"),("USBORNE"),("ARBONNE"),("USBORNE"),("ARBONNE"),("ACADEMY"),("ARBONNE"),("USBORNE"),("USBORNE"),("PILLAR"))
val rdd1 = sc.parallelize(d1)
val df1 = rdd1.toDF("brand_stop")

val d2 = Array(("TOASTMASTERS DISTRICT 48"),("USBORNE BOOKS AND MORE"),("ARBONNE"),("USBORNE BOOKS AT HOME"),("ARBONNE"),("ACADEMY, LTD."),("ARBONNE"),("USBORNE BOOKS AT HOME"),("USBORNE BOOKS & MORE"),("PILLAR TO POST HOME INSPECTION SERVICES")) 
val rdd2 =sc.parallelize(d2)
val df2 = rdd2.toDF("names")


def matchFunc(s1:String,s2:String) : Boolean ={ 
if(s2.contains(s1)) true
else false
}
val contains = udf(matchFunc _)

val like_with_python_udf = (df1.join(df2).where(contains(col("brand_stop"), col("names"))).select(col("brand_stop"), col("names")))
like_with_python_udf.show()

The Python code:

from pyspark.sql import Row
from pyspark.sql.functions import udf, col 
from pyspark.sql.types import BooleanType

schema1 = Row("brand_stop")
schema2 = Row("names")

df1 = sc.parallelize([
    schema1("TOASTMASTERS"),
    schema1("USBORNE"),
    schema1("ARBONNE")
]).toDF()
df2 = sc.parallelize([
    schema2("TOASTMASTERS DISTRICT 48"),
    schema2("USBORNE BOOKS AND MORE"),
    schema2("ARBONNE"),
    schema2("ACADEMY, LTD."),
    schema2("PILLAR TO POST HOME INSPECTION SERVICES")
]).toDF()

contains = udf(lambda s, q: q in s, BooleanType())

like_with_python_udf = (df1.join(df2)
    .where(contains(col("brand_stop"), col("names")))
    .select(col("brand_stop"), col("names")))
like_with_python_udf.show()

I am getting ouput:

+------------+ | brand_stop| +------------+ |TOASTMASTERS| | USBORNE| | ARBONNE| +------------+

Mugdha
  • 112
  • 9
  • Thanks @Mugdha for your response. When I run this(after converting it to Python), the error shown is:- "AnalysisException: u'Detected cartesian product for INNER join between logical plans". Please help me with this. – Anubhav Sarangi May 23 '18 at 10:32
  • Hey @AnubhavSarangi, I have edited answer with python code. Can you check it? – Mugdha May 23 '18 at 11:15
1

The matching should be done row wise

In that case you have to add some form of indices and join

from pyspark.sql.types import *

def index(df):
    schema = StructType(df.schema.fields + [(StructField("_idx", LongType()))])
    rdd = df.rdd.zipWithIndex().map(lambda x: x[0] +(x[1], ))
    return rdd.toDF(schema)

brand_name = spark.createDataFrame(["TOASTMASTERS", "USBORNE"], "string").toDF("brand_stop")
poi_name = spark.createDataFrame(["TOASTMASTERS DISTRICT 48", "USBORNE BOOKS AND MORE"], "string").toDF("poi_name")

index(brand_name).join(index(poi_name), ["_idx"]).selectExpr("*", "poi_name rlike brand_stop").show()
# +----+------------+--------------------+-------------------------+              
# |_idx|  brand_stop|            poi_name|poi_name RLIKE brand_stop|
# +----+------------+--------------------+-------------------------+
# |   0|TOASTMASTERS|TOASTMASTERS DIST...|                     true|
# |   1|     USBORNE|USBORNE BOOKS AND...|                     true|
# +----+------------+--------------------+-------------------------+