1

I have a corpus of documents that I'm reading into a spark data frame. I have tokeniked and vectorized the text and now I want to feed the vectorized data into an mllib LDA model. The LDA API docs seems to require the data to be:

rdd – RDD of documents, which are tuples of document IDs and term (word) count vectors. The term count vectors are “bags of words” with a fixed-size vocabulary (where the vocabulary size is the length of the vector). Document IDs must be unique and >= 0.

How can I get from my data frame to a suitable rdd?

from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer

#read the data
tf = sc.wholeTextFiles("20_newsgroups/*")

#transform into a data frame
df = tf.toDF(schema=['file','text'])

#tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(df)

#vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(tokenized)
result = model.transform(tokenized)

#transform into a suitable rdd
myrdd = ?

#LDA
model = LDA.train(myrdd, k=2, seed=1)

PS : I'm using Apache Spark 1.6.3

eliasah
  • 39,588
  • 11
  • 124
  • 154
ADJ
  • 4,892
  • 10
  • 50
  • 83
  • If I may ask, why are you using MLlib's LDA ? LDA is available with spark-ml – eliasah May 25 '17 at 16:02
  • Just trying to stich pieces across several tutorials. Not opposed to take a different approach. – ADJ May 25 '17 at 16:07
  • Then I advice to look at the official documentation concerning spark-ml. It's straightforward. Your value result is ready to feed as is normally. – eliasah May 25 '17 at 16:10
  • Just checked my sprk version. It's 1.6 and it doesn't seem to have LDA in pyspark.ml.clustering – ADJ May 25 '17 at 16:11
  • If the provided solution answers your question, please accept it to close the question, otherwise, please comment on the answer about why the solution doesn't work ! – eliasah May 29 '17 at 07:51
  • 1
    Will need to try it after the holiday weekend. thanks. – ADJ May 29 '17 at 18:01

1 Answers1

4

Let's first organize imports, read the data, do some simple special characters removal and transform it into a DataFrame:

import re # needed to remove special character
from pyspark import Row

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType

pattern = re.compile('[\W_]+') 

rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
    .mapValues(lambda x: pattern.sub(' ', x)).cache() # ref. https://stackoverflow.com/a/1277047/3415409

df = rdd.toDF(schema=['file', 'text'])

We will need to add an index to each Row. The following code snippet is inspired from this question about adding primary keys with Apache Spark :

row_with_index = Row(*["id"] + df.columns)

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])

    return _make_row

f = make_row(df.columns)

indexed = (df.rdd
           .zipWithUniqueId()
           .map(lambda x: f(*x))
           .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Once we have added the index, we can proceed to the features cleansing, extraction and transformation :

# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)

# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)

# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)

Now, let's transform the results dataframe back to rdd

corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
    .map(lambda x: [x[0], x[1]])

Our data is now ready to be trained :

# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary

We can print the topics descriptions now as followed :

for topic in range(len(topics)):
    print("topic {} : ".format(topic))
    words = topics[topic][0]
    scores = topics[topic][1]
    [print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]

PS : This above code was tested with Spark 1.6.3.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • Now how to test unseen data with this trained model? I have implemented more or less the same code in spark 2.1.1. I have searched for answers and most of them are recommending this topicDistributions() attribute but I am getting this error that LDAModel object has no topicDistribution() attribute. Any other way to test unseen data? – Usman Khan Apr 24 '19 at 05:12
  • I am using spark-mllib. ````from pyspark.mllib.clustering import LDA, LDAModel````. I have posted the question [question] – Usman Khan Apr 24 '19 at 08:55