I've first encountered this behavior in query 47 of TPCDS benchmark.
For clarification this is the query.
--q47.sql--
with v1 as(
select i_category, i_brand,
s_store_name, s_company_name,
d_year, d_moy,
sum(ss_sales_price) sum_sales,
avg(sum(ss_sales_price)) over
(partition by i_category, i_brand,
s_store_name, s_company_name, d_year)
avg_monthly_sales,
rank() over
(partition by i_category, i_brand,
s_store_name, s_company_name
order by d_year, d_moy) rn
from item, store_sales, date_dim, store
where ss_item_sk = i_item_sk and
ss_sold_date_sk = d_date_sk and
ss_store_sk = s_store_sk and
(
d_year = 1999 or
( d_year = 1999-1 and d_moy =12) or
( d_year = 1999+1 and d_moy =1)
)
group by i_category, i_brand,
s_store_name, s_company_name,
d_year, d_moy),
v2 as(
select v1.i_category, v1.i_brand, v1.s_store_name, v1.s_company_name, v1.d_year,
v1.d_moy, v1.avg_monthly_sales ,v1.sum_sales, v1_lag.sum_sales psum,
v1_lead.sum_sales nsum
from v1, v1 v1_lag, v1 v1_lead
where v1.i_category = v1_lag.i_category and
v1.i_category = v1_lead.i_category and
v1.i_brand = v1_lag.i_brand and
v1.i_brand = v1_lead.i_brand and
v1.s_store_name = v1_lag.s_store_name and
v1.s_store_name = v1_lead.s_store_name and
v1.s_company_name = v1_lag.s_company_name and
v1.s_company_name = v1_lead.s_company_name and
v1.rn = v1_lag.rn + 1 and
v1.rn = v1_lead.rn - 1)
select * from v2
where d_year = 1999 and
avg_monthly_sales > 0 and
case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1
order by sum_sales - avg_monthly_sales, 3
limit 100
As we can see the table v1
is used 3 times in the query
...
from v1, v1 v1_lag, v1 v1_lead
...
And the graph in Web UI is the following
As we can see in the left graph the value of number of output rows
of table store_sales
is equal to 2,879,789
which is equal to the size of the table.
However, on the right graph it shows that the number of output rows
of the same table is equal to 5,759,578
and this value propagates to the next plans like Filter
.
We can achieve the same result with a simpler query.
// create a temp table for tests
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("t")
// execute the query
spark.sql("""
with v1 as (
select id
from t group by id)
select v1.id, v11.id id1, v12.id id2
from v1, v1 v11, v1 v12
where v1.id = v11.id and
v1.id = v12.id + 1
""").count
The graph of this query is the following
As we can see the number of output rows
is two times higher than the size of the table. Moreover if we add table v1 one more time the number of output rows
is three times the size of the table and so on.
For example if we change the query like this
...
select v1.id, v11.id id1, v12.id id2, v13.id id3
from v1, v1 v11, v1 v12, v1 v13
where v1.id = v11.id and
v1.id = v12.id + 1 and
v1.id = v13.id
...
the number of output rows
becomes 9.
It's worth mentioning that if we use the table v1
only two times, the number of output rows
becomes equal to table size.
So, with the query like this
...
select v1.id, v11.id id1
from v1, v1 v11
where v1.id = v11.id
...
the number of output rows
becomes 3.
In cases like these I was expecting Spark to load the table as many times as the table is needed or to load the table once and then reuse it when needed but it seems like both of my assumptions were wrong.
So, why is number of output rows higher than the table size?
I've tested this both in Spark 2.2 and 2.3.