3

I want to convert a Dstream to a DataFrame in order to apply same transformations on this DataFrame and call a NaiveBayesModel model to predict target probability, I use Apache Spark 2.1.1, the Dstream is builded from a socketTextStream. I tried to call a foreachRDD function of the Dstream but it dosen't work.

def predict(rdd):
    count = rdd.count()
    if(count>0):
        hashingTF = HashingTF(numFeatures=1000)
        features = hashingTF.transform(rdd)
        result = model.transform(features)
        return result.probability
    else:
        print("No data receveid")

model = NaiveBayesModel.load(sc, "ML_models/NaiveClassifier/naiveBayesClassifier-2010-09-10-08-51-25")
lines = ssc.socketTextStream("localhost", 9999)
tweets = lines.map(lambda v: json.loads(v))
text_dstream = tweets.map(lambda tweet: tweet['text'])
df = text_dstream.foreachRDD(lambda rdd: predict(rdd))
ssc.start()             # Start the computation
ssc.awaitTermination()

I get the following error message

AttributeError: 'RDD' object has no attribute '_jdf'

My idea consist of converting the Dstream to a Spark DataFrame and apply the transformation using :

#Tokenize sentiment text
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SetimentTextTokenize")
wordsData = tokenizer.transform(df)

hashingTF = HashingTF(inputCol="SetimentTextTokenize", outputCol="rawFeatures", numFeatures=1000)
featurizedData = hashingTF.transform(wordsData)
Aissa El Ouafi
  • 157
  • 1
  • 4
  • 14

0 Answers0