0
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x) 
warningsRDD = inputRDD.filter(lambda x: "warning" in x) 
badLinesRDD = errorsRDD.union(warningsRDD)
badLinesCount = badLinesRDD.count()
warningCount = warningsRDD.count()

In the code above none of the transformations are evaluated until the second to last line of code is executed where you count the number of objects in the badLinesRDD. So when this badLinesRDD.count() is run it will compute the first four RDD's up till the union and return you the result. But when warningsRDD.count() is run it will only compute the transformation RDD's until the top 3 lines and return you a result correct?

Also when these RDD transformations are computed when an action is called on them where are the objects from the last RDD transformation, which is union, stored? Does it get stored in memory on the each of the DataNodes where the filter transformation was run in parallel?

zero323
  • 322,348
  • 103
  • 959
  • 935

1 Answers1

2

Unless task output is persisted explicitly (cache, persist for example) or implicitly (shuffle write) and there is enough free space every action will execute complete lineage.

So when you call warningsRDD.count() it will load the file (sc.textFile("log.txt")) and filter (inputRDD.filter(lambda x: "warning" in x)).

Also when these RDD transformations are computed when an action is called on them where are the objects from the last RDD transformation, which is union, stored?

Assuming data is not persisted, nowhere. All task outputs are discarded after data is passed to the next stage or output. If data is persisted it depends on the settings (disk, on-heap, off-heap, DFS).

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you for the clarification! I had one question: you stated that `"All task outputs are discarded after data is passed to the next stage or output"`. So basically that means when you first get to an action, the first four RDD transformations will be run but all of that data from those RDD transformations will be gone but when you call `val warningCount = warningsRDD.count()` for example (after the `badLinesRDD.count()` line) it will look at the dependency of the `val warningCount` which is `warningsRDD` and then it will look at the dependency of the `val warningsRDD` which is input RDD –  Mar 30 '16 at 06:05
  • and recalculate only those two transformation RDD's because of the lineage graph that was formed correct? That's why it doesn't recalculate the `errorsRDD = inputRDD.filter(lambda x: "error" in x)` since that it is not needed when evaluating the `val warningCount = warningsRDD.count()` line. –  Mar 30 '16 at 06:06
  • 1
    `errorsRDD` is not a dependency for `warningsRDD` hence it is not required. If you're in doubt you can always check DAG in the UI. See for example http://stackoverflow.com/q/34580662/1560062 – zero323 Mar 30 '16 at 06:16