-2

I have this sql query which is a left-join and has a select statement in the beginning which chooses from the right table columns as well.. Can you please help to convert it to a spark dataframes and get the result using spark-shell? I don't want to use the sql code in spark instead I want to use dataframes.

I know the join syntax in scala, but I don't know how to choose from the right table (here it is count(w.id2)) when resulting df from left join doesn't have access to the right table's columns.

Thank you!

select count(x.user_id) user_id_count, count(w.id2) current_id2_count
from
    (select
        user_id
    from
        tb1
    where
        year='2021'
        and month=1
        
    ) x
left join
    (select id1, max(id2) id2 from tb2 group by id1) w
on
    x.user_id=w.id1;

In spark I would create two dataframes x and w and join them:

var x = spark.sqlContext.table("tb1").where("year='2021' and month=1")
var w= spark.sqlContext.table("tb2").groupBy("id1").agg(max("id2")).alias("id2"
var joined = x.join(w, x("user_id")===w("id1"), "left")

EDIT : I was confused about the left join. There was some error from the spark that column id2 is not available and I thought it is because the resulting df from left-join will have only left table's columns. However the reason was that when I was choosing max(id2) I had to give it an alias correctly.

Here is a sample and the solution:

var x = Seq("1","2","3","4").toDF("user_id")

var w = Seq (("1", 1), ("1",2), ("3",10),("1",5),("5",4)).toDF("id1", "id2")

var z= w.groupBy("id1").agg(max("id2").alias("id2"))

val xJoinsZ= x.join(z, x("user_id") === z("id1"), "left").select(count(col("user_id").alias("user_id_count")), count(col("id2").alias("current_id2_count")))
scala> x.show(false)
+-------+
|user_id|
+-------+
|1      |
|2      |
|3      |
|4      |
+-------+
scala> z.show(false)
+---+---+                                                                       
|id1|id2|
+---+---+
|3  |10 |
|5  |4  |
|1  |5  |
+---+---+


scala> xJoinsZ.show(false)
+---------------------------------+---------------------------------+
|count(user_id AS `user_id_count`)|count(id2 AS `current_id2_count`)|
+---------------------------------+---------------------------------+
|4                                |2                                |
+---------------------------------+---------------------------------+
RRy
  • 433
  • 1
  • 5
  • 15
  • 4
    I am not sure I understand your problem. What are you trying to do, and what's the problem you're facing? – Oli Oct 27 '21 at 09:19
  • Hi Oli, I am trying to run that sql query using dataframes in spark-shell – RRy Oct 27 '21 at 20:37
  • 1
    Right. I mean you wrote Scala code. What's wrong with it? Is there something you don't manage to do? What's blocking you? – Oli Oct 27 '21 at 21:40
  • I want to do select and join similar to the sql command – RRy Oct 27 '21 at 23:19
  • What's wrong with something like `.select(count(x("user_id")) as "user_id_count", count(w("id2")) as "current_id2_count")` ? – Oli Oct 28 '21 at 07:14
  • since this is a left join, id2 is not there anymore – RRy Oct 28 '21 at 16:07
  • It should still be there. Have you checked ? `joined.printSchema` for instance ? – Oli Oct 28 '21 at 17:31
  • Yes, it is a left join – RRy Oct 29 '21 at 17:27
  • 2
    What does "left join doesn't have access to the right table's columns" mean? Where exactly are you stuck in your coding? [mre] Use enough words, sentences & references to parts of examples to clearly & fully say what you mean. Please (always) clarify via edits, not comments. – philipxy Nov 05 '21 at 00:35
  • A [mre] includes cut & paste & runnable code; example input with desired & actual output (including verbatim error messages); tags & versions; clear specification & explanation. For debug that includes the least code that is code that you show is OK extended by code that you show is not OK. (Debugging fundamental.) For SQL/R/etc include DDL & tabular initialization code. When you get a result you don't expect, pause your overall goal, chop to the 1st subexpression with unexpected result & say what you expected & why, justified by documentation. [ask] PS **Yelling** doesn`t clarify. – philipxy Nov 16 '21 at 01:58
  • @philipxy What's yelling?? I highlighted the part which I thought it is important – RRy Nov 17 '21 at 19:58
  • Please: Answers don't belong in questions, create an answer post for an answer. (You can get the unformatted text by clicking on "edited".) And roll back the inappropriate question post edit. "EDIT"s don't belong in posts, just edit to be the best presentation possible. PS I called your boldface "yelling" because there is no need to have so much text bolded, which is annoying to read; just compose your presentation clearly. [Help] – philipxy Nov 18 '21 at 02:08

1 Answers1

3

Your request is quite difficult to understand, however I am gonna try to reply taking the SQL code you provided as baseline and reproduce it with Spark.

// Reading tb1 (x) and filtering for Jan 2021, selecting only "user_id"
val x: DataFrame = spark.read
 .table("tb1")
 .filter(col("year") === "2021")
 .filter(col("mont") === "01")
 .select("user_id")

// Reading tb2 (w) and for each "id1" getting the max "id2"
val w: DataFrame = spark.read
 .table("tb2")
 .groupBy(col("id1"))
 .max("id2")

// Joining tb1 (x) and tb2 (w) on "user_id" === "id1", then counting user_id and id2
val xJoinsW: DataFrame = x
 .join(w, x("user_id") === w("id1"), "left")
 .select(count(col("user_id").as("user_id_count")), count(col("max(id2)").as("current_id2_count")))

A small but relevant remark, as you're using Scala and Spark, I would suggest you to use val and not var. val means it's final, cannot be reassigned, whereas, var can be reassigned later. You can read more here.

Lastly, feel free to change the Spark reading mechanism with whatever you like.

dadadima
  • 938
  • 4
  • 28