0

I am using Apache Flink to predict streams from Twitter.

Code is implemented in Scala

My Problem is, that my trained SVM-Model from the DataSet API needs a DataSet as an input for the predict()-Method.

I saw already a Question here, where a user said, that you need to write a own MapFunction which reads the model upon start of the job (ref: Real-Time streaming prediction in Flink using scala)

But i am not able to write/understand this code.

Even if i get the model inside the StreamingMapFunction. I still need a DataSet as a Parameter to predict the result.

I really hope someone can show/explain me how this is done.

Flink-Version: 1.9 Scala-Version: 2.11 Flink-ML:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)

IboJaan
  • 63
  • 7

1 Answers1

2

So, currently the FlinkML, which SVM is part of, does not support the streaming API. That is why SVM accepts only DataSet. The idea is not to use the FlinkML, but rather some SVM library available in scala or java. Then you could read the model, for example from file. The issue is that You have to implement most of the logic by Yourself.

The comment in the post You have mentioned is more or less saying the exact same thing.

Dominik Wosiński
  • 3,769
  • 1
  • 8
  • 22
  • thanks for your answer :) I know that the SVM from FlinkML is not supporting streaming by itself. But i found this post i linked, where it looks like its possible with some hints. Just i dindt get the logic behind it. – IboJaan Oct 24 '19 at 11:34
  • 1
    Yeah, the answer to the post You are mentioning is saying more or less the same thing as I described :) You should read a model from a file and store is as the state in the `MapFunction`. The `Model` type mentioned in there is just an interface that doesn't to much: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/ml/api/core/Model.html – Dominik Wosiński Oct 24 '19 at 11:46
  • reading a model from a file means in this case my SVM? but how do i make it accept a Vector or DataString[Vector] to predict? (thanks again for your answer) – IboJaan Oct 24 '19 at 12:08
  • 1
    You could possibly use some of the SVM libraries available for Java like https://www.csie.ntu.edu.tw/~cjlin/libsvm or any other library :) But You will most certainly need to adapt it to allow usage of `Vector`. – Dominik Wosiński Oct 28 '19 at 12:17