4

My question is Spark DataFrame.sample() function make use of ability to use predicate pushdown - sample record before deserializing it.

So if it makes such optimization - parquet would sample record first and deserialize only (for example) 10% of records if fraction=0.1

val df = sqlContext.read.load("datastore.parquet")    
// the next one would be optimized to use predicate/filter pushdown - and will be rather fast on parquet dataset    
df.filter($"col1"===1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#158L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#161L])
   Project
    Filter (event_type#0 = clk)
     Scan ParquetRelation[hdfs://...PARQUET][event_type#0]
*/


// the question here - is sample implementation optimized?    
df.sample(false, 0.1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#153L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#156L])
   Sample 0.0, 0.1, false, -4330576498454505344
    Scan ParquetRelation[hdfs://...PARQUET][]
*/

// combined operation
df.filter($"col1"===1).sample(false, 0.1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions=    [(count(1),mode=Final,isDistinct=false)], output=[count#163L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], functions=    [(count(1),mode=Partial,isDistinct=false)], output=[currentCount#166L])
   Sample 0.0, 0.1, false, 6556027534253692713
    Project
     Filter (event_type#0 = clk)
      Scan ParquetRelation[hdfs://...PARQUET][event_type#0]
*/
vvladymyrov
  • 5,715
  • 2
  • 32
  • 50
  • Judging by [this unit test](https://github.com/apache/spark/blob/76520955fddbda87a5c53d0a394dedc91dce67e8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L641-L654) - spark seems to be doing it but I'd like somebody to confirm it – vvladymyrov Oct 12 '15 at 19:46
  • But looking into physical plan - it looks like Sample happens after scanning ParquetRelation - so sampling is not pushed down – vvladymyrov Oct 12 '15 at 21:02
  • 3
    No, Spark pushes down only the predicates, not limits, order, grouping or any other part of the query. See my answer here: http://stackoverflow.com/a/32585936/1560062 – zero323 Oct 12 '15 at 21:33
  • Thanks for comment and confirming my suspicion. I'll try to use Parquet UDF to push down sampling. Btw @zero323 feel free to post your answer - I'll accept it. – vvladymyrov Oct 12 '15 at 21:40
  • If you look closely at the second plan you will notice that Spark doesn't read any data from parquet file and only iterates over each row (because of the count). Similarly in the third plan it only reads event_type column. – kostya Oct 13 '15 at 03:54
  • This is an old answer and many things have happened since 2015. I wonder if the answer is still the same, @zero323? – Def_Os Mar 21 '23 at 17:48

0 Answers0