Here is a log from a session using spark-shell
with a similar scenario.
Given
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
Your issue looks like
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
so map
just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).
So when you materialize (using collect()
) you get a "normal" collection:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
over which which you can map
. Note that in this case you have a side-effect in the closure passed to map
(the println
), the result of println
is Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
Same result if collect
is applied at the end:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
But if you just want to print the rows, you can simplify it to using foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect
is required as well:
persons.collect().foreach(t => println(t))
Notes
- The same behaviour can be observed in the
Iterator
class.
- The output of the session above has been reordered
Update
As for filtering: The location of collect
is "bad", if you apply filters after collect
which can be applied before.
For example these expressions give the same result:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
but the 2nd case is worse, because that filter could have been applied before collect
.
The same applies to any type of aggregation as well.