0

Referring to here who recommends Join to append column from one table to another. I have been using this method indeed, but now reach some limitation for huge list of tables and rows

Let's say I have a dataframe of M features id, salary, age, etc.

+----+--------+------------+--------------+
| id | salary | age | zone |  ....  
+----+--------+------------+--------------+

I have perform certain operations on each feature to arrive at something like this

+----+--------+------------+--------------+------------+--------------+--------------+--------------+
| id | salary | bin_salary | start_salary | end_salary | count_salary | stat1_salary | stat2_slaary | 
+----+--------+------------+--------------+------------+--------------+--------------+--------------+

Each feature is processed independently, with the same list of rows

+----+--------+------------+--------------+------------+
| id | salary | stat1_salary | stat2_salary | stat3_salary|
+----+--------+------------+--------------+------------+
| 301  | x1     | x          | x            | x |
| 302  | null   | x          | x            | x |
| 303  | x3     | x          | x            | x |

+----+--------+------------+--------------+
| id | age | stat1_age | stat2_age 
+----+--------+------------+--------------+
| 301  | null   | x          | x   
| 302  | x2     | x          | x   
| 303  | x3     | x          | x   

In the end, I would like to combine them into the final dataframe with all attributes of each features, by joining on unique ID of effectively hundreds to thousand of table, each for one feature. This final dataframe is my feature vector

| id | salary | stat1_salary | stat2_salary | stat3_salary| age | stat1_age | stat2_age

I am hitting some Memory limit that cause Out Of Memory exception. Raising executor and driver memory seems to only be a temporary solution, and limited by admin.

JOIN is expensive and limited by resource in pyspark, and I wonder if it's possible to pre-sort each feature table independently, then keep that order and just APPEND the entire column next to one another instead of performing expensive JOIN. I can manage to keep all the same list of rows for each feature table. I hope to have no join nor lookup because my set of Id is the same.

How is it achievable ? As far as I understand, even if I sort each table by Id, Spark distribute them for storage and the retrieval (if I want to query back to append) does not guarantee to have that same order.

Kenny
  • 1,902
  • 6
  • 32
  • 61
  • 1
    Your question piqued my curiosity. Are you talking about such an arrangement - https://stackoverflow.com/questions/55164274/efficient-way-of-joining-multiple-tables-in-spark – cph_sto Apr 12 '19 at 18:09
  • 1
    Indeed. But I have bigger problem with more features. Imagine each feature spawns a small attribute table with 10 columns, in the end joining them together create 10xhundreds of columns. That hurts. I am having either StackOverFlow or OutOfMemory issue now. I can support only up to certain number of features before crashing. – Kenny Apr 12 '19 at 18:33
  • I am also being limited by the administrator and can’t really figure out a way from this maze. If you are able to solve this, do compose an answer. – cph_sto Apr 12 '19 at 19:08
  • 1
    Please could you clarify how the data is stored beforehand? It sounds like you have M tables of N columns (where a table is e.g. the salary table, and has a bunch of columns related to salary)? What's the format of the input data (e.g, JSON, Parquet..)? – prince Apr 13 '19 at 07:50
  • 1
    Hi @Kenny all the datasets have the same schema? The first thing I would think is to use union. – abiratsis Apr 13 '19 at 13:45
  • Data is in Hadoop tabular format. Original table has M features salary, age,region,zone, etc. Each feature is put through some process in pyspark (not necessarily the same, for example categorical feature does not have start/end), to spawn a table of attributes as example for salary; _independently_, to avoid modifying the huge table around one column each. I keep unique Id for final join in the end to string together all _attributes_ of all features. Desired output is Id/salary/attributes_for_salary/age/attributes_for_age/... as feature vector. No Union here because column-wise, not row-wise – Kenny Apr 14 '19 at 22:34
  • This seems to be straightforward joins on a shared key. Problems are likely tied to indexing/ordering. Please give a [mcve]. Please clarify via edits, not comments Please do not link to anything necessary, put whatever is necessary (paraphrase/summary and/or quotes) in your post--make it self-contained. PS Generally speaking with tables it is a bad idea to put data into column names. That should be done as a last step for display. Eg we may want to display like with a matrix with 2 axes & a value at (row, column) but relational manipulation is per a table with columns (row, column, value). – philipxy Apr 15 '19 at 20:19
  • The reproducible will not be possible because it depends on your particular system config. I suppose you haven't run into this problem. Could you elaborate on your last phrase about 2 axes and 3 axes ? – Kenny Apr 16 '19 at 13:38
  • Hello Kenny, did you manage to solve this one without join? – abiratsis May 11 '19 at 09:58
  • Hi Alex, it seems we could not get away without Join. It boils down to optimizing the Join and try to limit the features list as much as possible. – Kenny May 31 '19 at 17:03

1 Answers1

0

There doesn't seem to be a spark function to append a column from one DF to another directly except 'join'.

If you are starting from only one dataframe and trying to generate new features from each original column of the dataframe. I would suggest to use 'pandas_udf', where the new features can be appended in the 'udf' for all the original columns.

This will avoid using 'join' at all. To control the memory usage, choose the 'group' column where we make sure that each group is within executor memory specification.

Dharman
  • 30,962
  • 25
  • 85
  • 135
niuer
  • 1,589
  • 2
  • 11
  • 14