3

I am trying to generate all combination of unique values within my spark dataframe. The solution, which comes to my mind require usage of itertools.product and pandas dataframe, and therefore it is not efficient enough. Here is my code:

all_date = [ i.Date for i in df.select("Date").distinct().collect()]
all_stores_id = [i.ID for i in fd.select("ID").distinct().collect()]
all_category = [i.CATEGORY for i in fd.select("CATEGORY").distinct().collect()]
combined = [all_date, all_stores_id, all_category]
all_combination_pdf= pd.DataFrame(columns = ['Date', 'ID', 'CATEGORY'], data=list(itertools.product(*combined)))
# convert pandas dataframe to spark
all_combination_df = sqlContext.createDataFrame(all_combination_pdf)
joined =  all_combination_df.join(df,["Date","ID","CATEGORY"],how="left")

Is there any way to change this code to more sparkonic one?

======EDIT======

I've also tried to implement such functionalities using the crossJoin function. Here is the the code:

test_df = ((df.select('Date').distinct()).crossJoin(df.select('ID').distinct())).crossJoin(df.select('CATEGORY').distinct())
test_df.show(10)

which for some unknown reason raise following exception:

An error occurred while calling o305.showString.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Integer.valueOf(Integer.java:832)
user1877600
  • 627
  • 1
  • 9
  • 26
  • *"some unknown reason"* - that error is pretty clear. You're running out of memory. How many distinct values do you have? – pault Nov 13 '18 at 15:47
  • The choice of word *unknown* was poor. I understand that the error is due to the memory limitation, but I don't know why it happens. The data sample generates about 1M distinct values and what is more important, the code implemented using pandas works fine. Do you have any idea how to reimplement pandas code into efficient pyspark one? – user1877600 Nov 13 '18 at 20:09

2 Answers2

3

You can generate the dataframe with this. It just creates a dataframe with the unique values of each column and performs a cross join (cartesian product) with the others.

((df.select('Date').distinct()).crossJoin(df.select('ID').distinct())).crossJoin(df.select('CATEGORY').distinct())

It can be put inside a for loop with some work to automatize it for other dataframes.

Hope this helps

Manrique
  • 2,083
  • 3
  • 15
  • 38
  • Thank you for your answer. Unfortunately, for some reason, which unknown to me, I am not able to execute your line following by **test_df.show(10)**. The error that I get *java.lang.OutOfMemoryError: GC overhead limit exceeded*. The project is developed on the Azure platform, so this is definitely not a hardware problem. – user1877600 Nov 13 '18 at 11:45
  • Sorry it didn't help. Its certainly strange a memory error in Azure, but think about the huge dataframe that is going to be created. If you have, for example, 3 columns, each of them with 5 different values, u will end up with 5^3 = 125 rows. Imagine with bigger values. – Manrique Nov 13 '18 at 12:13
0

You can use the readily available cube to get all the possible combinations of pyspark column values. I am also citing a great answer for this topic in this thread

Kay
  • 1
  • 2
  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Feb 20 '23 at 11:52