6

This is a quote from jaceklaskowski.gitbooks.io.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.

I don't understand why filter does not preserve partitioning. It's just getting a subset of each partition which satisfy a condition so I think partitions can be preserved. Why isn't it like that?

Hoori M.
  • 700
  • 1
  • 7
  • 21

2 Answers2

5

You are of course right. The quote is just incorrect. filter does preserve partitioning (for the reason you've already described), and it is trivial to confirm that

val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
  new org.apache.spark.HashPartitioner(11)
)

rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

rdd.partitioner == filteredRDD.partitioner
// Boolean = true

This stays in contrast to operations like map, which don't preserver partitioning (Partitioner):

rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None

Datasets are a bit more subtle, as filters are normally pushed-down, but overall the behavior is similar.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • 1
    I don't quite understand your reasoning, even though the rdds have the same partitioner, does this imply that the partitioning is the same? what does it even mean "the partitioning is preserved"? For me, preserving partitioning means that the number of partition remains the same, and no exchange between partitions occur (shuffling). But this would also be true for `map` – Raphael Roth May 11 '18 at 10:40
  • @RaphaelRoth [Default Partitioning Scheme in Spark](https://stackoverflow.com/q/34491219/9613318). `preservesPartitioning` means exactly _preserve `Partitioner`_ not _preserve number of partitions_. If you `map` resulting `RDD` won't have `Partitioner` at all. – Alper t. Turker May 11 '18 at 10:43
  • As Raphael mentioned I also do not understand what `preserving partitioning` mean exactly. I have code like this: `data.mapPartitions({iter => iter.flatMap(sth..)}, preservesPartitioning = true);` with this flatMap I totally get a different kind of RDD with a different partitioner so what does `preservesPartitioning = true` imply? – Hoori M. May 11 '18 at 13:57
  • By setting `preservesPartitioning` to `true` you tell Spark that operation on `RDD` didn't change keys (the first element of the tuple) therefore `Partitioner` of the parent (if present) is still valid for the child. – Alper t. Turker May 11 '18 at 13:59
  • The first `RDD` I have is a `pairedRDD[(Array[Int], Int)]` and with `flatMap` I create `RDD[(Int, Int)]` so keys are changed and the previous partitioner cannot be used. So does Spark really use the parameter of `preservesPartitioning = true` in this case? – Hoori M. May 11 '18 at 14:04
  • 1
    `RDD[(Array[Int], Int)]` [cannot use `(Hash)Partitioner`](https://stackoverflow.com/q/32698428/9613318) (unless you define your own) at all, so it will be ignored anyway. But if you change keys on partitioned data, but `preservesPartitioning = true` you misinform Spark, and downstream processing can yield incorrect results. So never do that. – Alper t. Turker May 11 '18 at 14:05
  • Thanks for your comment. I changed `Array[Int]` to `Vector[Int]` on which I think default `HashPartitioner` can be used. – Hoori M. May 11 '18 at 14:31
  • It can, but if your code worked before, it means that there is no operation upstream, which depends on hash partitioner anyway. – Alper t. Turker May 11 '18 at 14:33
4

Filter does preserve partitioning, at least this is suggested by the source-code of filter ( preservesPartitioning = true):

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145