8

We are using the PySpark libraries interfacing with Spark 1.3.1.

We have two dataframes, documents_df := {document_id, document_text} and keywords_df := {keyword}. We would like to JOIN the two dataframes and return a resulting dataframe with {document_id, keyword} pairs, using the criteria that the keyword_df.keyword appears in the document_df.document_text string.

In PostgreSQL, for example, we could achieve this using an ON clause of the form:

document_df.document_text ilike '%' || keyword_df.keyword || '%'

In PySpark however, I cannot get any form of join syntax to work. Has anybody achieved something like this before?

With kind regards,

Will

zero323
  • 322,348
  • 103
  • 959
  • 935
Will Hardman
  • 193
  • 1
  • 2
  • 8
  • 1
    You could close the question with accepting the answer which will encourage others to answer questions! Also you can keep open and update the question if you have still some question :) – WoodChopper Oct 29 '15 at 12:02

2 Answers2

20

It is possible in a two different ways but generally speaking not recommended. First lets create a dummy data:

from pyspark.sql import Row

document_row = Row("document_id", "document_text")
keyword_row = Row("keyword") 

documents_df = sc.parallelize([
    document_row(1L, "apache spark is the best"),
    document_row(2L, "erlang rocks"),
    document_row(3L, "but haskell is better")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("erlang"),
    keyword_row("haskell"),
    keyword_row("spark")
]).toDF()
  1. Hive UDFs

    documents_df.registerTempTable("documents")
    keywords_df.registerTempTable("keywords")
    
    query = """SELECT document_id, keyword
        FROM documents JOIN keywords
        ON document_text LIKE CONCAT('%', keyword, '%')"""
    
    like_with_hive_udf = sqlContext.sql(query)
    like_with_hive_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    
  2. Python UDF

    from pyspark.sql.functions import udf, col 
    from pyspark.sql.types import BooleanType
    
    # Of you can replace `in` with a regular expression
    contains = udf(lambda s, q: q in s, BooleanType())
    
    like_with_python_udf = (documents_df.join(keywords_df)
        .where(contains(col("document_text"), col("keyword")))
        .select(col("document_id"), col("keyword")))
    like_with_python_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

Why not recommended? Because in both cases it requires a Cartesian product:

like_with_hive_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter document_text#3 LIKE concat(%,keyword#4,%)
##   CartesianProduct
##    Scan PhysicalRDD[document_id#2L,document_text#3]
##    Scan PhysicalRDD[keyword#4]

like_with_python_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter pythonUDF#13
##   !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ...
##    CartesianProduct
##     Scan PhysicalRDD[document_id#2L,document_text#3]
##     Scan PhysicalRDD[keyword#4]

There are other ways to achieve a similar effect without a full Cartesian.

  1. Join on tokenized document - useful if keywords list is to large to be handled in a memory of a single machine

    from pyspark.ml.feature import Tokenizer
    from pyspark.sql.functions import explode
    
    tokenizer = Tokenizer(inputCol="document_text", outputCol="words")
    
    tokenized = (tokenizer.transform(documents_df)
        .select(col("document_id"), explode(col("words")).alias("token")))
    
    like_with_tokenizer = (tokenized
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          3|haskell|
    ## |          1|  spark|
    ## |          2| erlang|
    ## +-----------+-------+
    

    This requires shuffle but not Cartesian:

    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  SortMergeJoin [token#29], [keyword#4]
    ##   TungstenSort [token#29 ASC], false, 0
    ##    TungstenExchange hashpartitioning(token#29)
    ##     TungstenProject [document_id#2L,token#29]
    ##      !Generate explode(words#27), true, false, [document_id#2L, ...
    ##       ConvertToSafe
    ##        TungstenProject [document_id#2L,UDF(document_text#3) AS words#27]
    ##         Scan PhysicalRDD[document_id#2L,document_text#3]
    ##   TungstenSort [keyword#4 ASC], false, 0
    ##    TungstenExchange hashpartitioning(keyword#4)
    ##     ConvertToUnsafe
    ##      Scan PhysicalRDD[keyword#4]
    
  2. Python UDF and broadcast variable - if keywords list is relatively small

    from pyspark.sql.types import ArrayType, StringType
    
    keywords = sc.broadcast(set(
        keywords_df.map(lambda row: row[0]).collect()))
    
    bd_contains = udf(
        lambda s: list(set(s.split()) & keywords.value), 
        ArrayType(StringType()))
    
    
    like_with_bd = (documents_df.select(
        col("document_id"), 
        explode(bd_contains(col("document_text"))).alias("keyword")))
    
    like_with_bd.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

    It requires neither shuffle nor Cartesian but you still have to transfer broadcast variable to each worker node.

    like_with_bd.explain()
    
    ## TungstenProject [document_id#2L,keyword#46]
    ##  !Generate explode(pythonUDF#47), true, false, ...
    ##   ConvertToSafe
    ##    TungstenProject [document_id#2L,pythonUDF#47]
    ##     !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ...
    ##      Scan PhysicalRDD[document_id#2L,document_text#3]
    
  3. Since Spark 1.6.0 you can mark a small data frame using sql.functions.broadcast to get a similar effect as above without using UDFs and explicit broadcast variables. Reusing tokenized data:

    from pyspark.sql.functions import broadcast
    
    like_with_tokenizer_and_bd = (broadcast(tokenized)
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#3L,keyword#5]
    ##  BroadcastHashJoin [token#10], [keyword#5], BuildLeft
    ##   TungstenProject [document_id#3L,token#10]
    ##    !Generate explode(words#8), true, false, ...
    ##     ConvertToSafe
    ##      TungstenProject [document_id#3L,UDF(document_text#4) AS words#8]
    ##       Scan PhysicalRDD[document_id#3L,document_text#4]
    ##   ConvertToUnsafe
    ##    Scan PhysicalRDD[keyword#5]
    

Related:

zero323
  • 322,348
  • 103
  • 959
  • 935
  • This is an excellent and helpful reply. Thank you for taking the time to write something so comprehensive. Not only have you answered my question, but I learned quite a few other things that I didn't know you could do. I'm going to use the variable broadcast method, since the keyword list will be small. Problem solved! – Will Hardman Oct 19 '15 at 09:40
  • OK. One problem, @zero323. The `explode()` function was introduced only in Spark 1.4. I'm stuck (for now) with 1.3.1. Is it possible to embed the UDF into a `map()` function to return multiple rows (i.e. one per matching keyword) for each row of input? – Will Hardman Oct 19 '15 at 12:24
  • Solved it! For reference: `like_with_bd= documents_df.select( col("document_id"), bd_contains(col("document_text")).alias("keyword")).flatMap(lambda row: [(kw, row[0]) for kw in row[1]])` – Will Hardman Oct 19 '15 at 12:38
0

Precise way of doing this is as below:(Bit slow but accurate)

from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType

from pyspark.sql import Row


def string_match_percentage(col_1, col_2, confidence):
    s = col_1.lower()
    t = col_2.lower()

    global row, col
    rows = len(s) + 1
    cols = len(t) + 1
    array_diffrence = np.zeros((rows, cols), dtype=int)

    for i in range(1, rows):
        for k in range(1, cols):
            array_diffrence[i][0] = i
            array_diffrence[0][k] = k

    for col in range(1, cols):
        for row in range(1, rows):
            if s[row - 1] == t[col - 1]:
                cost = 0
            else:
                cost = 2
            array_diffrence[row][col] = min(array_diffrence[row - 1][col] + 1,
                                            array_diffrence[row][col - 1] + 1,
                                            array_diffrence[row - 1][col - 1] + cost)
    match_percentage = ((len(s) + len(t)) - array_diffrence[row][col]) / (len(s) + len(t)) * 100
    if match_percentage >= confidence:
        return True
    else:
        return False


document_row = Row("document_id", "document_text")
keyword_row = Row("keyword")

documents_df = sc.parallelize([
    document_row(1, "google llc"),
    document_row(2, "blackfiled llc"),
    document_row(3, "yahoo llc")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("yahoo"),
    keyword_row("google"),
    keyword_row("apple")
]).toDF()

conditional_contains = udf(lambda s, q: string_match_percentage(s, q, confidence=70), BooleanType())

like_joined_df = (documents_df.crossJoin(keywords_df)
                        .where(conditional_contains(col("document_text"), col("keyword")))
                        .select(col("document_id"), col("keyword"), col("document_text")))
like_joined_df.show()

Output:

# +-----------+-------+-------------+
# |document_id|keyword|document_text|
# +-----------+-------+-------------+
# |          1| google|   google llc|
# |          3|  yahoo|    yahoo llc|
# +-----------+-------+-------------+