2

In Spark Streaming, every time a new message is received, a model will be used to predict sth based on this new message. But as time goes by, the model can be changed for some reason, so I want to re-load the model whenever a new message comes in. My code looks like this

def loadingModel(@transient sc:SparkContext)={
  val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel")
  model
}

var error=0.0
var size=0.0
implicit def bool2int(b:Boolean) = if (b) 1 else 0
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = {
  val model=loadingModel(sc)
  val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble }
  val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail))
  val prediction = model.predict(pairs.features)
  val wrong= prediction != pairs.label
  error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int)
  size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0
  val output = (key, error,size)
  state.update(Array(error,size))
  Some(output)
}
val stateSpec = StateSpec.function(updateState _)
  .numPartitions(1)
setupLogging()
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("test").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics).mapWithState(stateSpec)

When I run this code, there would be an exception like this

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

If you need more information, please let me know. Thank you!

Zefu Hu
  • 33
  • 4
  • Can you post the full stack trace? also are you trying to use any class which is not serializable inside spark transformations like map, filter? – Shankar Oct 17 '16 at 04:50
  • @Shankar Hi, If I just load the model (`val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel")`) outside `updateState` without defining `loadingMode`l, it works fine. I guess problem comes with `sc` – Zefu Hu Oct 17 '16 at 05:29
  • @Shankar and I add more code there, :P – Zefu Hu Oct 17 '16 at 05:35
  • Were you able to do this? I am facing similar seriazable exception whenI am trying to run Mlib model's predict function in the streaming context. I am using Spark 1.4.0 – Srivatsan Nallazhagappan Nov 08 '16 at 19:26
  • @SrivatsanNallazhagappan Unfortunately, no. I tried [PMML](https://github.com/jpmml) but would encounter the same serialization problem. A potential solution is by sending the new model through a Kafka topic, but I have to write a new serializer for Mllib, haven't tried it yet. – Zefu Hu Nov 09 '16 at 02:50

1 Answers1

0

When a model is used within DStream function, spark seem to serialize the context object (because model's load function uses sc), and it fails because the context object is not serializable. One workaround is to convert DStream to RDD, collect the result and then run model prediction/scoring in the driver.

Used netcat utility to simulate streaming, tried the following code to convert DStream to RDD, it works. See if it helps.

val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.socketTextStream("xxx", 9998)
val linedstream = lines.map(lineRDD =>  Vectors.dense(lineRDD.split(" ").map(_.toDouble)) )
val logisModel = LogisticRegressionModel.load(sc, /path/LR_Model")
linedstream.foreachRDD( rdd => {
    for(item <- rdd.collect().toArray) {
    val predictedVal = logisModel.predict(item)
        println(predictedVal + "|" + item);
    }
})

Understand collect is not scalable here, but if you think that your streaming messages are less in number for any interval, this is probably an option. This is what I see it possible in Spark 1.4.0, the higher versions probably have a fix for this. See this if its useful,

Save ML model for future usage

Community
  • 1
  • 1