3

For example, the data is

customer = spark.createDataFrame([
    (0, "Bill Chambers"),
    (1, "Matei Zaharia"),
    (2, "Michael Armbrust")])\
  .toDF("customerid", "name")

order = spark.createDataFrame([
    (0, 0, "Product 0"),
    (1, 1, "Product 1"),
    (2, 1, "Product 2"),
    (3, 3, "Product 3"),
    (4, 1, "Product 4")])\
  .toDF("orderid", "customerid", "product_name")

to get customer with order, I can do it with left semi

customer.join(order, ['customerid'], "left_semi").show()

It can return

enter image description here

Now for comparison reason, I want to add a flag column instead of directly filtering out some rows. The desired output would look like this:

+----------+----------------+---------+ 
|customerid|            name|has_order| 
+----------+----------------+---------+ 
|         0| Bill Chambers  |     true| 
|         1| Matei Zaharia  |     true| 
|         2|Michael Armbrust|    false| 
+----------+----------------+---------+

How can I do it? Is there any elegant way to do it? I tried to search but didn't find things related, maybe I get the wrong key words?


Possible to do it with SQL's exist/in?: Spark replacement for EXISTS and IN

ZK Zhao
  • 19,885
  • 47
  • 132
  • 206
  • You could use `except` like `join_result.except(customer).withColumn("has_order", lit(False))` and then `union` the result with `join_result.withColumn("has_order", lit(True))`. Or you could select distinct order_id and then do a left join with `customer` then use `when` - `otherwise` with `nvl` to populate `has_order` – philantrovert May 24 '18 at 13:20
  • @philantrovert yeah....but that's just much more compleax compare to the `left_semi`'s simplicity. – ZK Zhao May 24 '18 at 13:40
  • @cqcn1991 personally I think join and filter is the better solution, but there's also the option of `union` of `leftsemi` and `leftanti` joins (see my [updated answer](https://stackoverflow.com/a/50510507/5858851)). – pault May 24 '18 at 16:01

1 Answers1

1

You can do a left join, and use pyspark.sql.Column.isNull() to create the has_order column based on whether or not the orderid columns is not null. Then use distinct() to drop the duplicate rows.

import pyspark.sql.functions as f
customer.alias("c").join(order.alias("o"), on=["customerid"], how="left")\
    .select(
        "c.*",
        f.col("o.orderid").isNull().alias("has_order")
    )\
    .distinct()\
    .show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         0|   Bill Chambers|     true|
#|         1|   Matei Zaharia|     true|
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+

If you wanted to use something similar to the left-semi join you were working with, you can union the result of left-semi join and a left-anti join:

cust_left_semi = customer.join(order, ['customerid'], "leftsemi")\
    .withColumn('has_order', f.lit(True))
cust_left_semi.show()
#+----------+-------------+---------+
#|customerid|         name|has_order|
#+----------+-------------+---------+
#|         0|Bill Chambers|     true|
#|         1|Matei Zaharia|     true|
#+----------+-------------+---------+

cust_left_anti = customer.join(order, ['customerid'], "leftanti")\
    .withColumn('has_order', f.lit(False))
cust_left_anti.show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+

cust_left_semi.union(cust_left_anti).show()
#+----------+----------------+---------+
#|customerid|            name|has_order|
#+----------+----------------+---------+
#|         0|   Bill Chambers|     true|
#|         1|   Matei Zaharia|     true|
#|         2|Michael Armbrust|    false|
#+----------+----------------+---------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • OK, I'll try to benchmark these 2 solutions and give you some feedback. – ZK Zhao May 24 '18 at 22:53
  • And, is it possible to achieve the result through SQL with simple solution, as IN/Exist is supported in SQL? – ZK Zhao May 24 '18 at 22:58
  • As shown in the [post you linked](https://stackoverflow.com/questions/34861516/spark-replacement-for-exists-and-in#), there is currently no support for EXISTS/IN in SparkSQL AFAIK. – pault May 29 '18 at 16:51