1

I would like to apply summary and customized stats functions to all columns, independently and in parallel.

from pyspark.sql.functions import rand, randn
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))

I have tried to look for answers such as here and here, but they seem to be giving one value for each row. It seems that udf and withColumns are solutions, but I am not sure how to put together to achieve something similar to this :

df.describe().show()
+-------+------------------+-------------------+--------------------+
|summary|                id|            uniform|              normal|
+-------+------------------+-------------------+--------------------+
|  count|                10|                 10|                  10|
|   mean|               4.5| 0.5215336029384192|-0.01309370117407197|
| stddev|2.8722813232690143|  0.229328162820653|  0.5756058014772729|
|    min|                 0|0.19657711634539565| -0.7195024130068081|
|    max|                 9| 0.9970412477032209|  1.0900096472044518|
+-------+------------------+-------------------+--------------------+

Let's say we have a couple of dummy functions for summary

def mySummary1(df_1_col_only):
   return mean(df_1_col_only)*second_largest(df_1_col_only)

def mySummary2(df_1_col_only):
   return 10th_decile(df_1_col_only)/median(df_1_col_only)

so that when applying in parallel to all columns

df.something_that_allow_spark_parallel_processing_all_columns(column_list, [mean, mySummary1, mySummary2]) 

and produce (preferably the following, or the transposed)

+-------+------------------+-------------------+--------------------+
    |col_name|            mean|       mySummary1|            mySummary2|
    +-------+------------------+-------------------+--------------------+
    |  id|                10|                 10|                  10|
    |uniform|               4.5| 0.5215336029384192|-0.01309370117407197|
    | normal|2.8722813232690143|  0.229328162820653|  0.5756058014772729|
     +-------+------------------+-------------------+--------------------+

I would want to utilize the parallel power of Spark to parallelize in both columns list and inside each mySummaryxxx functions.

Another question is what should my df_1_col_only be like for efficiency ? 1-column Spark dataframe ? Is there a way of not copying of duplicating the original dataframe into a bunch of 1-column dataframe ?

Kenny
  • 1,902
  • 6
  • 32
  • 61
  • You could union a row to dataframe returned by describe. Something like: `df.describe().union(df.select(lit("cust_stat1").alias("summary"), *[myCustomerFunction(c).alias(c) for c in df.columns])`. Hard to say more without a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) that shows exactly what types of custom stats you want to compute. – pault Mar 20 '19 at 22:29
  • @Pault: Just add some info to my question. I am not sure your approach will parallellize the column processing. _for_ gives that away. The parallel processing is crucial, – Kenny Mar 21 '19 at 03:00
  • df.describe takes a list of the stats you want it to return and empty argument returns all stats. To get aggregate stats in parallel fashion use UDAFs, finally you can union the two dataframes to get a dataframe in the format you want – ookboy24 Mar 21 '19 at 04:52
  • How would you simulate panda_udf in Spark<=2.2 (due to company's infra). I would like to parallel process columns, and in each column make use of Spark to parallel process rows. Let's say I have certain stat that I want to apply per column. collect_list gives list, is it efficient to convert to new Spark DF ? def myFunc(d_list): sdf = spark.createDataframe(d_list); return sdf.map(...).reduceByKey(...); and called on two columns df.agg(collect_list('col1').alias('col1'),collect_list('col2').alias('col2') ) .withColumn('col1_', myUdf('col1')).withColumn('col2_', myUdf('col2')) – Kenny Mar 22 '19 at 15:14

0 Answers0