2

I have a saved h2o model in mojo format, and now I am trying to load it and use it to make predictions on a new dataset (df) as part of a spark app written in scala. Ideally, I wish to append a new row to the existing DataFrame containing the class probability based on this model.

I can see how to apply a mojo to an individual row already in a RowData format (as per answer here), but I am not sure how to map over an existing DataFrame so that it is in the right format to make predictions using the mojo model. I have worked with DataFrames a fair bit, but never with the underlying RDDs.

Also, should this model be serialised / broadcast so that predictions can be done in parallel on a cluster, or will it be available to all executors as part of the map?

I have gotten this far:

# load mojo model and create easy predict model wrapper
val mojo = MojoModel.load("loca/path/to/mojo/mojo.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

# map over spark DataFrame, converty to rdd, and make predictions on each row:
df.rdd.map { row =>
   val prediction = easyModel.predictBinomial(row).classProbabilities
   println(prediction)
   }

But my row variable is not in the right format for this to work. Any suggestions on what to try next?

EDIT: my DataFrame consists of 70 predictive feature columns which are a mixture of integers and category/factor columns. A very simple sample DataFrame:

val df = Seq(
  (0, 3, "cat1"),
  (1, 2, "cat2"),
  (2, 6, "cat1")
).toDF("id", "age", "category")
renegademonkey
  • 457
  • 1
  • 7
  • 18

2 Answers2

1

Use this function to prepare RowData object needed for H2O:

def rowToRowData(df: DataFrame, row: Row): RowData = {
  val rowAsMap = row.getValuesMap[Any](df.schema.fieldNames)
  val rowData = rowAsMap.foldLeft(new RowData()) { case (rd, (k,v)) => 
    if (v != null) { rd.put(k, v.toString) }
    rd
  }
  rowData
}
Dmitry
  • 2,943
  • 1
  • 23
  • 26
  • Hi @Dmitry, thanks for this - I just got back from holidays and am trying this out. My code is: `df.rdd.map { r => val rData = rowToRowData(df, r); val prediction = easyModel.predictBinomial(rData).classProbabilities; println("prediction = "+prediction.mkString(",")) }`, but I am not seeing any predictions being printed. I have been able to output them using `.collect()` instead of `.rdd`, but this is not practical for large datasets . . How do I append the prediction as an additional column of the distributed df, or alternatively as a new df in parallel? – renegademonkey Jan 02 '18 at 15:55
  • I think the problem is that you trying to print from rdd.map method. This code executes on Spark executors, so output is printed to executor's stdout. If you want to see output in console (= driver's stdout), your options are 1) collect(), as you suggested 2) in map code return Row with predictions, convert resulting RDD back to DataFrame, and then use show() method to see results. The second option also allows you to write results to HDFS without collecting everything on driver. – Dmitry Jan 06 '18 at 11:35
0

I have a complete answer here: https://stackoverflow.com/a/47898040/9120484 You can call map on df directly instead of on rdd.

jliu3230
  • 56
  • 5