I am having a little trouble deciding the best way to do this. Currently I have the following:
val df = DataFrame (just take as given)
val intrdd = df.rdd.map(row => processRow(row, df))
def processRow(row: Row, statedf: DataFrame) : Row = {
val rowvaluesmap = row.getValuesMap[Any](row.schema.fieldNames)
val imputedvals = rowvaluesmap.transform((key,value) => imputeNulls(key, value, rowvaluesmap, statedf)
val rowfinal = Row.fromSeq(imputedvals.values.toSeq)
return rowfinal
}
def imputeNulls(key: String, value: Any, rowvaluesmap: Map[String, _], statedf: DataFrame): Any = {
val rowvaluesmapnotnull = rowvaluesmap.filter((t) => t._1 == null)
val rowvaluesmapnull = rowvaluesmap.filter((t) => t._1 != null)
// keys changed for privacy
if (value != null) {
return null
} else if (value == null && (key == "x" | key == "y" | key == "z")) {
val imputedval = imputeNullValueString(key, value, rowvaluesmapnotnull, statedf)
return imputedval
} else if (value == null && (key == "1" | key == "2" | key == "3")) {
val imputedval = imputeNullValueInt(key, value, rowvaluesmapnotnull, statedf)
return imputedval
} else if (value == null && (key == "a" | key == "b" | key == "c" | key == "d")) {
val imputedval = imputeNullValueShort(key, value, rowvaluesmapnotnull, statedf)
return imputedval
} else if (value == null && (key == "z" | key == "r" | key == "w" | key == "q")) {
val imputedval = imputeNullValueFloat(key, value, rowvaluesmapnotnull, statedf)
return imputedval
} else {
return null
}
}
}
where imputeNullValueX returns a value in the appropriate format.
I am sure there is a better way to do this. Is there an optimal way to do this? I think returning the
val rowfinal = Row.fromSeq(imputedvals.values.toSeq)
return rowfinal
is screwing things up.
Thanks.