2

I've first encountered this behavior in query 47 of TPCDS benchmark.

For clarification this is the query.

--q47.sql--                                                                                                                                                     

  with v1 as(                                                                                                                                                    
  select i_category, i_brand,                                                                                                                                    
         s_store_name, s_company_name,                                                                                                                           
         d_year, d_moy,                                                                                                                                          
         sum(ss_sales_price) sum_sales,                                                                                                                          
         avg(sum(ss_sales_price)) over                                                                                                                           
           (partition by i_category, i_brand,                                                                                                                    
                      s_store_name, s_company_name, d_year)                                                                                                      
           avg_monthly_sales,                                                                                                                                    
         rank() over                                                                                                                                             
           (partition by i_category, i_brand,                                                                                                                    
                      s_store_name, s_company_name                                                                                                               
            order by d_year, d_moy) rn                                                                                                                           
  from item, store_sales, date_dim, store                                                                                                                        
  where ss_item_sk = i_item_sk and                                                                                                                               
        ss_sold_date_sk = d_date_sk and                                                                                                                          
        ss_store_sk = s_store_sk and                                                                                                                             
        (                                                                                                                                                        
          d_year = 1999 or                                                                                                                                       
          ( d_year = 1999-1 and d_moy =12) or                                                                                                                    
          ( d_year = 1999+1 and d_moy =1)                                                                                                                        
        )                                                                                                                                                        
  group by i_category, i_brand,                                                                                                                                  
           s_store_name, s_company_name,                                                                                                                         
           d_year, d_moy),                                                                                                                                       
  v2 as(                                                                                                                                                         
  select v1.i_category, v1.i_brand, v1.s_store_name, v1.s_company_name, v1.d_year,                                                                               
                      v1.d_moy, v1.avg_monthly_sales ,v1.sum_sales, v1_lag.sum_sales psum,                                                                       
                      v1_lead.sum_sales nsum                                                                                                                     
  from v1, v1 v1_lag, v1 v1_lead                                                                                                                                 
  where v1.i_category = v1_lag.i_category and                                                                                                                    
        v1.i_category = v1_lead.i_category and                                                                                                                   
        v1.i_brand = v1_lag.i_brand and                                                                                                                          
        v1.i_brand = v1_lead.i_brand and                                                                                                                         
        v1.s_store_name = v1_lag.s_store_name and                                                                                                                
        v1.s_store_name = v1_lead.s_store_name and                                                                                                               
        v1.s_company_name = v1_lag.s_company_name and                                                                                                            
        v1.s_company_name = v1_lead.s_company_name and                                                                                                           
        v1.rn = v1_lag.rn + 1 and                                                                                                                                
        v1.rn = v1_lead.rn - 1)                                                                                                                                  
  select * from v2                                                                                                                                               
  where  d_year = 1999 and                                                                                                                                       
         avg_monthly_sales > 0 and                                                                                                                               
         case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1                                         
  order by sum_sales - avg_monthly_sales, 3                                                                                                                      
  limit 100

As we can see the table v1 is used 3 times in the query

...
from v1, v1 v1_lag, v1 v1_lead
...

And the graph in Web UI is the following

enter image description here

As we can see in the left graph the value of number of output rows of table store_sales is equal to 2,879,789 which is equal to the size of the table.

However, on the right graph it shows that the number of output rows of the same table is equal to 5,759,578 and this value propagates to the next plans like Filter.

We can achieve the same result with a simpler query.

// create a temp table for tests
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("t")

// execute the query
spark.sql("""
with v1 as (
  select id
  from t group by id)
select v1.id, v11.id id1, v12.id id2
from v1, v1 v11, v1 v12
where v1.id = v11.id and
v1.id = v12.id + 1
""").count

The graph of this query is the following

enter image description here

As we can see the number of output rows is two times higher than the size of the table. Moreover if we add table v1 one more time the number of output rows is three times the size of the table and so on.

For example if we change the query like this

...
select v1.id, v11.id id1, v12.id id2, v13.id id3
from v1, v1 v11, v1 v12, v1 v13
where v1.id = v11.id and
v1.id = v12.id + 1 and
v1.id = v13.id
...

the number of output rows becomes 9.

It's worth mentioning that if we use the table v1 only two times, the number of output rows becomes equal to table size.

So, with the query like this

...
select v1.id, v11.id id1
from v1, v1 v11
where v1.id = v11.id
...

the number of output rows becomes 3.

In cases like these I was expecting Spark to load the table as many times as the table is needed or to load the table once and then reuse it when needed but it seems like both of my assumptions were wrong.

So, why is number of output rows higher than the table size?

I've tested this both in Spark 2.2 and 2.3.

Panos
  • 557
  • 5
  • 13

1 Answers1

0

I think this metric means something else. I did a simple test to understand what it means.

public class TestCode2 {

    public static void main(String[] args) throws InterruptedException {
        Dataset<Row> userDetails = TestCode2.getSparkSession().read()
                .format("csv")
                .option("header", "true")
                .schema(TestCode2.minimumCustomerDataSchema())
                .load("info2.csv")
                .filter(new Column("account_id").isNotNull())
                .repartition(new Column("account_id"))
                .sortWithinPartitions(new Column("account_id"));
        Dataset<Row> userListOld = TestCode2.getSparkSession().read()
                .format("csv")
                .option("header", "true")
                .schema(TestCode2.minimumUserListSchema())
                .load("old.csv").repartition(30);
        userDetails = userDetails.cache();
        System.out.println("userdetails count="+userDetails.count());

        Dataset<Row> newJoin = outerJoinWithAccountToUserIdMap(userListOld, userDetails);
        Dataset<Row> oldJoin = innerJoinWithAccountToUserIdMap(userListOld, userDetails);
        persist(newJoin);
        persist(oldJoin);
        System.out.println("userdetails count="+userDetails.count());
        Thread.sleep(1000000);//so that you can visist the ui and see how the executor performed.
    }

    public static SparkSession getSparkSession() {
        return SparkSession.builder()
                .appName("Customer Aggregation pipeline")
                .master("local")
                .config("spark.sql.autoBroadcastJoinThreshold", "-1")
                .getOrCreate();
    }

    public static void persist(Dataset<Row> dataSet) {
        Dataset<Row> cache = dataSet.repartition(1);
        cache.foreachPartition((ForeachPartitionFunction<Row>) iterator -> {
            while (iterator.hasNext()) {
                Row row = iterator.next();
                System.out.println("dumping="+row);
                // do something with the row
            }
        });
    }

    public static StructType minimumCustomerDataSchema() {
        return DataTypes.createStructType(new StructField[]{
                        DataTypes.createStructField("account_id", DataTypes.StringType, false),
                        DataTypes.createStructField("name", DataTypes.StringType, true)
                }
        );
    }

    public static StructType minimumUserListSchema() {
        return DataTypes.createStructType(new StructField[]{
                        DataTypes.createStructField("userid", DataTypes.StringType, true)
                }
        );
    }

    private static Dataset<Row> outerJoinWithAccountToUserIdMap(Dataset<Row> latestRunDataset, Dataset<Row> accountToUserIdDataset) {
        return latestRunDataset.join(accountToUserIdDataset, latestRunDataset.col("userid").equalTo(accountToUserIdDataset.col("account_id")), "outer");
    }

    private static Dataset<Row> innerJoinWithAccountToUserIdMap(Dataset<Row> latestRunDataset, Dataset<Row> accountToUserIdDataset) {
        return latestRunDataset.join(accountToUserIdDataset, latestRunDataset.col("userid").equalTo(accountToUserIdDataset.col("account_id")), "inner");
    }
}

info2.csv

account_id,name
1,a1
2,a2
3,a3
4,a4
5,a5
6,a6
7,a7
8,a8
9,a9
10,a10
11,a11
12,a12
13,a13
14,a14
15,a15
16,a16
17,a17
18,a18
19,a19
20,a20
21,a21
22,a22
23,a23
24,a24
25,a25
26,a26
27,a27
28,a28
29,a29
30,a30
31,a31

old.csv

userid
1
2
3
4
5
6
7
8

in the above script here are the row numbers that i got

during count = 31
during outerjoin = 31
during innerjoin = 8
best wishes
  • 5,789
  • 1
  • 34
  • 59