7

I am unsure if this is a bug, so if you do something like this

// d:spark.RDD[String]
d.distinct().map(x => d.filter(_.equals(x)))

you will get a Java NPE. However if you do a collect immediately after distinct, all will be fine.

I am using spark 0.6.1.

Josh Rosen
  • 13,511
  • 6
  • 58
  • 70
Sheng
  • 1,697
  • 4
  • 19
  • 33
  • Are you sure the distinct is no returning any null value? Collect on collections uses a partial function that may not match null. In your case you are using a method call on an object that may be null. – Thomas Dec 07 '12 at 21:08
  • @Thomas I am pretty sure I do not have null in d. Besides, I can do d.distinct().foreach(println), which prints out all the distinct values in d successfully. – Sheng Dec 07 '12 at 21:40
  • Then the stack trace may hint on where the NPE was born. – Thomas Dec 07 '12 at 21:53
  • @Thomas, yes, distinct.foreach is fine, but npe was thrown immediately after that when program started to map the rdd returned from distinct – Sheng Dec 07 '12 at 22:09
  • Try protecting from NPE, for example map(x => d.filter(y => if(y!= null) y.equals(x) else false) or better yet (x => d.filter(x == _)). Maybe d.filter allows null values. – Thomas Dec 07 '12 at 22:13
  • @Thomas I just found out d is actually null inside map closure..., so npe really complaints null.filter not _.equals(null) – Sheng Dec 07 '12 at 22:38
  • Any thnoughts? d is no way a null object, otherwise d.distinct() would fail in the first place. can anybody shed some lights on why it becomes null inside map?? – Sheng Dec 08 '12 at 15:57

2 Answers2

13

Spark does not support nested RDDs or user-defined functions that refer to other RDDs, hence the NullPointerException; see this thread on the spark-users mailing list.

It looks like your current code is trying to group the elements of d by value; you can do this efficiently with the groupBy() RDD method:

scala> val d = sc.parallelize(Seq("Hello", "World", "Hello"))
d: spark.RDD[java.lang.String] = spark.ParallelCollection@55c0c66a

scala> d.groupBy(x => x).collect()
res6: Array[(java.lang.String, Seq[java.lang.String])] = Array((World,ArrayBuffer(World)), (Hello,ArrayBuffer(Hello, Hello)))
Josh Rosen
  • 13,511
  • 6
  • 58
  • 70
0

what about the windowing example provided in the Spark 1.3.0 stream programming guide

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

SPARK-5063 causes the example to fail since the join is being called from within the transform method on an RDD

Chris
  • 11
  • 1