I am using Apache Flink to predict streams from Twitter.
Code is implemented in Scala
My Problem is, that my trained SVM-Model from the DataSet API needs a DataSet as an input for the predict()-Method.
I saw already a Question here, where a user said, that you need to write a own MapFunction which reads the model upon start of the job (ref: Real-Time streaming prediction in Flink using scala)
But i am not able to write/understand this code.
Even if i get the model inside the StreamingMapFunction. I still need a DataSet as a Parameter to predict the result.
I really hope someone can show/explain me how this is done.
Flink-Version: 1.9 Scala-Version: 2.11 Flink-ML:2.11
val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)