1

I'm new to Spark. I'm trying to implement tf-idf. I need to calculate how many times each word occurs in each document and number of total words in each document.

I want to make reduce and possibly another operation but I don't know yet how. Here's the input I have:

Pairs are of the form (documentName , (word, wordCount)) ex.

("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
    ("doc2",("have", 5))

Keys are documents and values are words and how many times that word occurs in that document. I want to count total words in every document and possibly calculate percentage of that word.

Output I want:

("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
    ("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))

I get the effect by

corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))

Starting point :

collect_of_docs = [(doc1,text1), (doc2, text2),....]

def count_words(x):
    l = []
    words = x[1].split()
    for w in words:
        l.append(((w, x[0]), 1))
    return l

sc = SparkContext() 
corpus = sc.parallelize(collect_of_docs)
input = (corpus
    .flatMap(count_words)
    .reduceByKey(add)
    .map(lambda ((x,y), z) : (y, (x,z))))

If possible I want to make only one reduce operation with a tricky operator maybe. Any help is appreciated :) Thanks in advance.

zero323
  • 322,348
  • 103
  • 959
  • 935
dogacanb
  • 121
  • 1
  • 8

1 Answers1

1

Generally speaking it doesn't make sense to flatMap just to gather your data later. I assume your data looks more or less like this:

collect_of_docs = sc.parallelize([
    (1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
    (2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."),
    (3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")])

First we'll need some helpers using a basic regular expression and a Counter:

from __future__ import division  # If for some reason you use Python 2.x
import re
from collections import Counter

def count_words(doc, pattern=re.compile("\w+")):
    """Given a tuple (doc_id, text)
    return a tuple (doc_id, tokens_count

    >>> count_words((1, "Foo bar bar."))
    (1, Counter({'Foo': 1, 'bar': 2}))
    """
    (doc_id, text) = doc
    return (doc_id, Counter(pattern.findall(text))) 

def compute_tf(cnt):
    """Convert term counter to term frequency

    >>> compute_tf(Counter({'Foo': 1, 'bar': 2}))
    {'Foo': 0.3333333333333333, 'bar': 0.6666666666666666}
    """
    n = sum(cnt.values())
    return {k: v / n for (k, v) in cnt.items()}

and the final results:

tfs = (collect_of_docs
    .map(count_words)
    .mapValues(compute_tf))

tfs.sortByKey().first()

## (1,
##  {'Lorem': 0.125,
##   'adipiscing': 0.125,
##   'amet': 0.125,
##   'consectetur': 0.125,
##   'dolor': 0.125,
##   'elit': 0.125,
##   'ipsum': 0.125,
##   'sit': 0.125})

Using above document frequency can computed as follows:

from operator import add

dfs = (tfs
    .values()
    .flatMap(lambda kv: ((k, 1) for k in kv.keys()))
    .reduceByKey(add))

dfs.sortBy(lambda x: -x[1]).take(5)

## [('ipsum', 2),
##  ('dolor', 2),
##  ('consectetur', 1),
##  ('finibus', 1),
##  ('fringilla', 1)]
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Yes, it's what I want:)) I want to ask one more thing out of curiosity. Let's say I want to compute tf-idf score for each word and exhibit it them in the form doc1 word1 score1 word2 score2 ; doc2 word1 score1 word2 score2 word3 score3;.......... What is the most efficient way here? I think I'd just iterate over tfs keys and corresponding words and look up word idf's from dfs. How would you do it? – dogacanb Nov 15 '15 at 21:55
  • It depends. If number of unique terms is relatively small then you can collect dfs, broadcast as dict, and flatMap over tfs. Otherwise I would flatMap tfs to`(term, (doc_id, freq))`) and join with `dfs`. – zero323 Nov 15 '15 at 22:00
  • 1
    There is a huge difference even if it doesn't look like it. Using broadcast variable doesn't require shuffling. So it is a local operation. If dfs is large then broadcasting becomes a limiting factor and shuffle / hash join becomes a better solution. – zero323 Nov 15 '15 at 22:09