1

I have a dataframe like below

  private val sample = Seq(
    (1, "A B C D E"),
    (1, "B C D"),
    (1, "B C D E"),
    (1, "B C D F"),
    (1, "A B C"),
    (1, "B C E F G")
  )

I want to remove the least used words from the dataframe. For this i used tf-idf to calculate the least used word.

// Create the Tokenizer step
val tokenizer = new Tokenizer()
  .setInputCol("regexTransformedColumn")
  .setOutputCol("words")

// Create TF
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")

// Create TF IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

// Create the pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, idf))

val lrModel = pipeline.fit(regexTransformedLabel)

val lrOutput = lrModel.transform(regexTransformedLabel)

I am getting output like below

+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+
|clusterId|words          |rawFeatures                                                    |features                                                                                                    |
+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+
|1        |[a, b, c, d, e]|(262144,[17222,27526,28698,30913,227410],[1.0,1.0,1.0,1.0,1.0])|(262144,[17222,27526,28698,30913,227410],[0.5596157879354227,0.3364722366212129,0.0,0.0,0.8472978603872037])|
|1        |[b, c, d]      |(262144,[27526,28698,30913],[1.0,1.0,1.0])                     |(262144,[27526,28698,30913],[0.3364722366212129,0.0,0.0])                                                   |
|1        |[b, c, d, e]   |(262144,[17222,27526,28698,30913],[1.0,1.0,1.0,1.0])           |(262144,[17222,27526,28698,30913],[0.5596157879354227,0.3364722366212129,0.0,0.0])                          |
|1        |[b, c, d, f]   |(262144,[24152,27526,28698,30913],[1.0,1.0,1.0,1.0])           |(262144,[24152,27526,28698,30913],[0.8472978603872037,0.3364722366212129,0.0,0.0])                          |
|1        |[a, b, c]      |(262144,[28698,30913,227410],[1.0,1.0,1.0])                    |(262144,[28698,30913,227410],[0.0,0.0,0.8472978603872037])                                                  |
|1        |[b, c, e, f, g]|(262144,[17222,24152,28698,30913,51505],[1.0,1.0,1.0,1.0,1.0]) |(262144,[17222,24152,28698,30913,51505],[0.5596157879354227,0.8472978603872037,0.0,0.0,1.252762968495368])  |
+---------+---------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+

But how can i get the words from the transformed features, so that i can remove the least used words.

I will pass the max features to remove the words with tf-idf feature more than max features. If i give max features as 0.6, A(0.8) and G(1.2) should be removed from the data frame. But i couldn't convert the features to words so that i can remove the least used words.

Tom Lous
  • 2,819
  • 2
  • 25
  • 46
Mohan
  • 463
  • 3
  • 11
  • 24
  • _But i couldn't convert the features to words so that i can remove the least used words_ - and you won't be able - [How to get word details from TF Vector RDD in Spark ML Lib?](https://stackoverflow.com/q/32285699). Otherwise get `idf` from `IDFModel`, find records with the highest IDF, and slice vector (keep in mind it will remove features not tokens). Or better use `CountVectorizer` with `minDF` or `vocabSize`. – zero323 May 30 '18 at 14:24

1 Answers1

1

Using your example I'd use a CountVectorizer & CountVecorizerModel. Since HashingTF is a hashing method, so not reversible to extract the original labels.

It does mean that you have to fit 2 models, one for the CountVectorizer and one for the IDF

In the sample the vocabulary is local, so if you want to run this on a cluster, probably better to make it a broadcast variable.

// Create the Tokenizer step
val tokenizer = new Tokenizer()
  .setInputCol("regexTransformedColumn")
  .setOutputCol("words")

// Create CountVecoritzer for label vocab
val countVectorizer = new CountVectorizer()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")
  .setMinDF(1)

// Combine into count vectorizer pipeline
val cvPipeline = new Pipeline()
  .setStages(Array(tokenizer, countVectorizer))

// Create pipeline for token & count vectorizer (TF)
val pipelineModel = cvPipeline.fit(regexTransformedLabel)

// Extract vocabulary
val vocabulary = pipelineModel.stages.last.asInstanceOf[CountVectorizerModel].vocabulary

// Transform the dataset to TF dataset
val termFrequencyData = pipelineModel.transform(regexTransformedLabel)

// Create IDF
val idf = new IDF().setInputCol(countVectorizer.getOutputCol).setOutputCol("features")

// Fit the IDF on the TF data
val lrModel = idf.fit(termFrequencyData)

// Tranform the TF Data into TF/IDF data
val lrOutput = lrModel.transform(termFrequencyData)


def removeLeastUsed(treshold: Double) = udf((features: SparseVector) => {
  (features.indices zip features.values) filter(_._2 < treshold) map {
    case (index, _) => vocabulary(index)
  }
})


lrOutput
 .select(
    'regexTransformedColumn, 
    'features,
    removeLeastUsed(0.6)('features).as("mostUsedWords")
  )  
  .show(false)

Output:

+----------------------+----------------------------------------------------------------------------------+-------------+
|regexTransformedColumn|features                                                                          |mostUsedWords|
+----------------------+----------------------------------------------------------------------------------+-------------+
|A B C D E             |(7,[0,1,2,3,4],[0.0,0.0,0.3364722366212129,0.5596157879354227,0.8472978603872037])|[c, b, d, e] |
|B C D                 |(7,[0,1,2],[0.0,0.0,0.3364722366212129])                                          |[c, b, d]    |
|B C D E               |(7,[0,1,2,3],[0.0,0.0,0.3364722366212129,0.5596157879354227])                     |[c, b, d, e] |
|B C D F               |(7,[0,1,2,5],[0.0,0.0,0.3364722366212129,0.8472978603872037])                     |[c, b, d]    |
|A B C                 |(7,[0,1,4],[0.0,0.0,0.8472978603872037])                                          |[c, b]       |
|B C E F G             |(7,[0,1,3,5,6],[0.0,0.0,0.5596157879354227,0.8472978603872037,1.252762968495368]) |[c, b, e]    |
+----------------------+----------------------------------------------------------------------------------+-------------+
Tom Lous
  • 2,819
  • 2
  • 25
  • 46
  • Is there a way i can tell tf-idf algorithm to consider the document with same cluster_id to calculate idf in my input data? Basically calculate td-idf based on cluster_id. – Mohan May 30 '18 at 15:28
  • I don't really understand what you mean, but you could separate TF IDF models using `mapPartitions` on your data, but it would be probably be more efficient to train the models on a subset of the data and transform the real data using the pre-trained models. You would probably miss some stuff in your vocabulary, but those items would be rare on average anyway – Tom Lous May 30 '18 at 18:28
  • If each row of my input data represent a document. Those documents are grouped by cluster_id. My problem would be i want to calculate tf-idf for the documents belong to same cluster_id. say i would have 20 different cluster_id's and each cluser_id would have 100 rows(documents). I want to calculate tf-idf separately for those 100 rows(documents) belonging to same cluser_id. How to separate TF IDF models using mapPartitions? I couldn't find the proper example to do it. – Mohan May 31 '18 at 02:50
  • 1
    I would actually first collect a list of all the potential clusterId's and then map over this list filtering a (cached?) source trainingdataframe and training individual countvectorizers / idfs models for each cluster. Collect these models, with the correct clusterId and then map again over this list and transform a filtered dataframe (on clusterId) based on the appropriate models. You could even union all these transformed dataframes to end up with 1 dataset at the end – Tom Lous May 31 '18 at 06:42
  • Your suggestion looks good for me. Whether map over the cluster list and filtering will not impact performance if the cluster list is huge? whether data partitioned with cluster_id will help to improve performance. – Mohan May 31 '18 at 06:55
  • Yes if you have the option to read from parquet partitioned by clusterId, that would help significantly. – Tom Lous May 31 '18 at 07:00