4

I'm getting a NPE when trying to coalesce and save out an RDD.

Code works locally, and works on the cluster in the scala shell, but throws the error when submitting it as a job to the cluster.

I've tried printing out using a take() to see if the rdd contains some null data, but this throws the same error - pain because it works ok in the shell.

I'm saving out to HDFS and have the full url path in the variable - model saves with this method fine during MLLib training phase.

Any ideas much appreciated!

Scala Code (Whole Prediction Func):

//Load the Random Forest
val rfModel = RandomForestModel.load(sc, modelPath)

//Make the predictions - Here the label is the unique ID of the point
val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features)))


//Collect and save
println("Done Modelling, now saving preds")
val outP = rfPreds.coalesce(1,true).saveAsTextFile(outPreds)
println("Done Modelling, now saving coords")
val outC = coords.coalesce(1,true).saveAsTextFile(outCoords)

Stack Trace:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 40, XX.XX.XX.XX): java.lang.NullPointerException
    at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
    at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
Dusted
  • 123
  • 2
  • 10
  • 1
    What's at `GeoDist1.scala:340`? – Kenney Oct 03 '15 at 13:46
  • `val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features)))` – Dusted Oct 03 '15 at 13:51
  • 1
    Check if a point doesn't have features. – eliasah Oct 03 '15 at 13:59
  • 1
    Will check, but would that explain why it works in cluster shell but not on cluster submit? – Dusted Oct 03 '15 at 14:03
  • That's a lot of code. Please try to reduce the problem to a minimal example before posting to Stack Overflow. It has several benefits: 1) it makes it easier to answer, 2) it is more likely that someone in the future will have the same problem, 3) 90% of the time you figure out what was the issue while you try to reduce the problem. – Daniel Darabos Oct 03 '15 at 21:19
  • Sure - I'll put it back to the original but Imagin (comment below) needed some extra detail to help - that why I dropped a few more lines in. – Dusted Oct 05 '15 at 07:04
  • I think you are starting up your scala shell wrongly, as it should show the same behavior as when you submit the job. Most likely you are running your spark shell in a single instance outside your cluster. – YoYo May 10 '16 at 15:12

1 Answers1

7

Spark operations are divided into lazy transformations and actions.

A lazy transformation on a RDD is performed when an action is called on the RDD.
So when you execute a transformation it is just stored as an operation to be performed.

The saveAsTextFile method is an action meanwhile the map operation is transformation.

If there is any issue on a transformation step, it will show as an issue at the action level step in which the transformation was called.

So you might have an issue during the map operation in which there is a null value in some of the field which is most probably causing your NPE issue.

eliasah
  • 39,588
  • 11
  • 124
  • 154
Ajay Gupta
  • 3,192
  • 1
  • 22
  • 30
  • 1
    Adding to this answer the following : coalesce with 1 partition is not a good idea or good practice, since you are trying to move all your data into one partition which will overwhelm the driver and might cause an OOME issue. – eliasah Oct 03 '15 at 14:53
  • 1
    Ok thanks, I'll look into these and come back - guess I'm still a little unsure as to why the cluster shell works - thought these operations would have the same lazy and action split? – Dusted Oct 03 '15 at 15:01
  • 1
    @Dusted with the information given, we still can't answer why it works in shell and not in cluster mode. You'll need to give more information about how you are submitting your app, cluster configuration, etc. – eliasah Oct 03 '15 at 16:04
  • Ive deployed cluster using the ec2 scripts - everything standard, spark 1.5.0, hadoop 1x, linux ami's. Data copied to hdfs (model saved in previous step there aswell). App is written in scala - packaged locally and scp to server, then run with this command: ` ./spark/bin/spark-submit --class "GeoDistPredict1" --master spark://ec2-XX-XX-XX-XX.eu-west-1.compute.amazonaws.com:7077 Geo-Pred_2.10-1.0.jar `. When I run scala shell from /bin I just paste the lines straight in. I'm pretty much a noob with Spark (as you can prob tell) so not sure what other info I can give - but let me know. – Dusted Oct 03 '15 at 17:31
  • 1
    @Dusted : Are you running on same data for Local and Production Environment? Or in local small data and on cluster huge data? – Ajay Gupta Oct 03 '15 at 17:36
  • Yeah - rolled back to same data (small) for local, cluster (shell) and cluster (submit) to help debug - only fails on cluster submit. – Dusted Oct 03 '15 at 17:47
  • @Dusted : Can you please add some more parts of the code.If possible. – Ajay Gupta Oct 03 '15 at 17:49
  • @Dusted : Try this code I am nto sure it will work but give it a try. >>> val rfPreds = labDistVect.filter(x=>x.label.!=(None)).map(p => (p.label, rfModel.predict(p.features))) – Ajay Gupta Oct 03 '15 at 18:27
  • @imagin : Yeah similar to what I was just doing - still failed. I've checked the features and labels - they are all good, vectors are dense, counts are equal, no nulls. Model has loaded fine and prints debug string ok. – Dusted Oct 03 '15 at 18:45