5

I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.

When I do

myrdd.first_1.datum.getName()

I can get the name. Problem is, I have 10 records in "myrdd". When I do:

myrdd.map(x => {println(x._1.datum.getName())})

it does not work and prints out a weird object a single time. How can I iterate over all records?

Rolando
  • 58,640
  • 98
  • 266
  • 407

1 Answers1

15

Here is a log from a session using spark-shell with a similar scenario.

Given

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

Your issue looks like

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

so map just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).

So when you materialize (using collect()) you get a "normal" collection:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

over which which you can map. Note that in this case you have a side-effect in the closure passed to map (the println), the result of println is Unit):

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

Same result if collect is applied at the end:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

But if you just want to print the rows, you can simplify it to using foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect is required as well:

persons.collect().foreach(t => println(t))

Notes

  • The same behaviour can be observed in the Iterator class.
  • The output of the session above has been reordered

Update

As for filtering: The location of collect is "bad", if you apply filters after collect which can be applied before.

For example these expressions give the same result:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]

but the 2nd case is worse, because that filter could have been applied before collect.

The same applies to any type of aggregation as well.

Beryllium
  • 12,808
  • 10
  • 56
  • 86
  • 2
    Just a small note: if this is running on a cluster, you would need to use `take()` or `collect()` on the RDD before calling `foreach(println)`. This is because, data must be brought back to the driver since the spark context runs there. (For a local spark progam, your answer works fine.) – Rohan Aletty Oct 09 '15 at 15:14
  • If I had a million records, I heard collect is bad because all that data is sent to your machine... Is there a way to make it so that say, I could "prefilter" out all the names that are "Justin" and age 18/19 before doing the foreach(println)? Basically, if I have a 5 node cluster, I want to "divy" up the prefiltering work amongst all the machines before looping through the remaining records I want to output. – Rolando Oct 09 '15 at 15:23
  • @Rolando Please have a look at the updated answer - I have added an example with a filter on "both sides" of `collect`. – Beryllium Oct 10 '15 at 09:41
  • @RohanAletty Thanks for the hint; I have updated the answer – Beryllium Oct 10 '15 at 12:02