1

Context : My company is in Spark 2.2 so it's not possible to use pandas_udf for distributed column processing

I have dataframes that contain thousands of columns(features) and millions of records

df = spark.createDataFrame([(1,"AB", 100, 200,1), (2, "AC", 150,200,2), (3,"AD", 80,150,0)],["Id","Region","Salary", "HouseHoldIncome", "NumChild"])

I would like to perform certain summary and statistics on each column in a parallel manner and wonder what is the best way to achieve this

#The point is any kind of customized summary can exist in my stat1, acting on a Spark dataframe to exploit the distributed processing; of one single column
def stat1(df_1_col):
   if (datatype is not numeric):
      return "NA"
   max_df = df_1_col.groupby().max().collect()
   if (max_df >50):
     return df_1_col.map(....).reduceByKey(...)
   else:
     return get_1st_decile(df_1_col)/df_1_col.agg(mean())

I would like to achieve

+-------+------------------+-------------------+--------------------+
    |col_name|            stat1|       stat2|            stat3|
    +-------+------------------+-------------------+--------------------+
    |  Id|                10|                 10|                  10|
    |Salary|               4.5| 0.5215336029384192|-0.01309370117407197|
    | HouseholdIncome|2.8722813232690143|  0.229328162820653|  0.5756058014772729|
     +-------+------------------+-------------------+--------------------+

These are my questions :

1/ How can I achieve this distributed processing without pandas_udf ?

2/ In the worse case, I need to use for loop.

   col_list = ["Id","Salary", "HouseHoldIncome", "NumChild"]
        for col in col_list:
          ....#how to call stat1[col] properly and collect to final result

How should we write it properly to achieve the above-mentioned form. To my understanding, .withColumn() and udf cannot be used here because it require collect_list to flatten my column dataframe to list and lose the parallel processing power of Spark DF; not to mentioned I have tried _collect_list_ on 10 millions record and the list is too much to handle

.groupBy().agg(stat1_udf(collect_list('data')).alias('data'))

Reference here

3/ If we have to use for loop, will Spark treat all columns in parallel ? According to here, for loop across columns can still be treated in parallel! But to my understanding, this works because it's row wise and only Transformations involved. So we can say that at for loop step, the row-wise Transformations are only added to the DAG without any evaluations. Therefore we have df_col1->Transformation(df_col_1), df_col2->Transformation(df_col_2), etc. "prepared" in DAG . At action step, these will be distributed by Spark master and treated in parallel.

However, for my case, it's a summary that requires reduce, sum, mean or some collect ,etc. , therefore each loop/column is forced to evaluate before next one come in. The DAG cannot wait but have to perform df_col1->Transformation_and_Action(df_col_1)->df_col2->Transformation_and_Action(df_col_2) making it sequential on thousands of columns

Any takers ?

Kenny
  • 1,902
  • 6
  • 32
  • 61
  • Because you apply global aggregations vectorized UDF wouldn't be of any use for you here anyway - there are not designed for such operations. Additionally it is not even clear why you need user defined function at all - Spark provides large variety of descriptive statistics out-of-the box. Nonetheless if these don't work for you, you should either use proper Scala aggregates, or reducing RDD operations. – 10465355 Jun 25 '19 at 19:20

0 Answers0