3

I'm pulling my hair out trying to solve what I feel is an extremely simple problem, but I'm not sure if there's some spark voodoo occurring as well.

I have two tables, which are both very small. Table A has about 90K rows and Table B has about 2K rows.

Table A

A  B  C  D
===========
a1 b1 c1 d1
a1 b1 c2 d2
a1 b1 c3 d3
a2 b2 c1 d1
a2 b2 c2 d2
.
.
.

Table B

A  B  E  F
===========
a1 b1 e1 f1
a2 b2 e2 f2

I want a table that looks like

Result Table

A  B  C  D  E  F
=================
a1 b1 c1 d1 e1 f1
a1 b1 c2 d2 e1 f1
a2 b2 c1 d1 e2 f2
.
.
.

I was a little loose, but the idea is I want to join the table with fewer rows on the table with more rows and it's okay to have multiple associated values in the final table.

This should be really simple:

table_a.join(table_b, table_a.a == table_b.a, table_a.b == table_b.b).select(..stuff..)

HOWEVER, for almost all of the resulting values in the Result Table (which should have about 90K rows since Table A has about 90K rows), I get null values in columns E and F.

When I save the result of just Table B, I see all the columns and values. When I save the result of just Table A, I see all the columns and values. (i.e I could do a paper and pencil join)

The weird thing is that even though ~89K rows have null values in columns E and F in the Result Table, there are a few values that do randomly join.

Does anyone know what's going on or how I can diagnose this?

confused
  • 31
  • 1
  • 2
  • Sorry, I may be misreading your question. Are you saying that the E and F columns are mostly empty? If so, why are you surprised that they're also mostly empty in a join? Your code looks correct, so I'm wondering if your understanding of the data is where the problem lies. To start, you may want to figure out how many rows from B are even joining onto A (perhaps by doing a join in base Spark). You may also want to consider if you're expecting an [outer join or an inner join](http://stackoverflow.com/a/38578/6157047) and performing the opposite (see the "how" argument in the join docstring). – Galen Long Apr 26 '16 at 04:51
  • @nightingalen Sorry for the confusion. I'm saying that `Table A` and `Table B` are fully populated. Then, when I join them, the columns I expect to have values (namely `E` and `F`) are `null`. I want to associate with each row of `Table A` a row from the smaller table `Table B`, which is fully populated. When I do a single unit test on some mock dataframes, it works, but then when I run in production, I get `null` values. – confused Apr 26 '16 at 05:22
  • I see. Then I would restate what I said: try and figure out how many are even joining at all. You can do this in base spark to distinguish between missing values because the data got lost/something happened, or missing values because there was simply no equivalent table. Maybe you need to do some cleaning, or use a different joining column set. – Galen Long Apr 26 '16 at 05:55
  • @nightingalen I'm not sure what you mean by base spark. Just locally? – confused Apr 26 '16 at 05:56
  • Sorry, by "base Spark" I mean in an RDD instead of a DataFrame. But, I just realized that isn't necessary anyway. Since you're using an inner join, if there simply weren't equivalent keys in each table, you would have a much smaller RDD. I can think of a few more approaches. Are there only a few keys that match onto B, and do most of those rows happen to have missing values? Can you replicate the problem with a smaller sample of the production data? Can you find a specific example of a row that should've joined with full columns but didn't, and trace its execution through the script? – Galen Long Apr 26 '16 at 06:02
  • @nightingalen I must just not be seeing something. When I find a row and trace it through the script, I just don't see what could possibly be wrong. It's very frustrating. I'm trying to perform the transformations locally, but I need to wrangle down the data. And the final dataframe should be the size of `Table A` (the larger dataframe), since `Table B` is effectively joining on a superset of it's columns. – confused Apr 26 '16 at 06:06
  • @nightingalen When I take a small sample, use spark shell and join them, everything works as expected and I get about `90K` rows, which is what I expect. None of them have `null` values. So frustrating! – confused Apr 26 '16 at 06:11
  • Apologies, without seeing the code I can't think of another way to help you. Hopefully someone else will have an answer. Good luck. – Galen Long Apr 26 '16 at 06:11
  • 1
    Where is your data coming from? Are you generating columns `E` and `F` on the fly, or in some way differently than the other columns? – David Griffin Apr 26 '16 at 11:59
  • 1
    @DavidGriffin I generate `E` and `F` on the fly. So the flow is: `table_a_df = build_table_a(...)` `table_b_df = build_table_b(...)` <- `E` and `F` are created from existing data `result_df = table_a_df.join(table_b_df, ....)` – confused Apr 26 '16 at 17:22
  • 1
    Any time you can't different r results from a small vs large dataset it's because your code is not serializing correctly- it's how I knew to ask if those were dynamic. Something in the code that creates those columns is not serializing. Has nothing to do with join – David Griffin Apr 26 '16 at 18:39
  • Sorry was driving during that last comment -- pardon the typos! – David Griffin Apr 26 '16 at 22:17

1 Answers1

0

Have you tried <=> instead of == in your join?

MatthewH
  • 93
  • 1
  • 1
  • 5
  • 1
    Hi MatthewH, I am looking for alternate <=> in pyspark. I am using version 1.6 of pyspark and using <=> is giving me a syntax error. Could you please help? – orNehPraka Sep 05 '17 at 18:16