My question is Spark DataFrame.sample() function make use of ability to use predicate pushdown - sample record before deserializing it.
So if it makes such optimization - parquet would sample record first and deserialize only (for example) 10% of records if fraction=0.1
val df = sqlContext.read.load("datastore.parquet")
// the next one would be optimized to use predicate/filter pushdown - and will be rather fast on parquet dataset
df.filter($"col1"===1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#158L])
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#161L])
Project
Filter (event_type#0 = clk)
Scan ParquetRelation[hdfs://...PARQUET][event_type#0]
*/
// the question here - is sample implementation optimized?
df.sample(false, 0.1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#153L])
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#156L])
Sample 0.0, 0.1, false, -4330576498454505344
Scan ParquetRelation[hdfs://...PARQUET][]
*/
// combined operation
df.filter($"col1"===1).sample(false, 0.1).count
/*
== Physical Plan ==
TungstenAggregate(key=[], functions= [(count(1),mode=Final,isDistinct=false)], output=[count#163L])
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions= [(count(1),mode=Partial,isDistinct=false)], output=[currentCount#166L])
Sample 0.0, 0.1, false, 6556027534253692713
Project
Filter (event_type#0 = clk)
Scan ParquetRelation[hdfs://...PARQUET][event_type#0]
*/