0

I am trying to do a bigram count using Spark, Python API.

I am getting strange output. Multiple lines of:

 generator object genexpr at 0x11aab40 

This is my code:

from pyspark import SparkConf, SparkContext
import string

conf = SparkConf().setMaster('local').setAppName('BigramCount')
sc = SparkContext(conf = conf)

RDDvar = sc.textFile("file:///home/cloudera/Desktop/smallTest.txt")

sentences = RDDvar.flatMap(lambda line: line.split("."))
words = sentences.flatMap(lambda line: line.split(" "))
bigrams = words.flatMap(lambda x:[((x[i],x[i+1]) for i in range(0,len(x)-1))])

result = bigrams.map(lambda bigram: bigram, 1)
aggreg1 = result.reduceByKey(lambda a, b: a+b)

result.saveAsTextFile("file:///home/cloudera/bigram_out")

What is going wrong?

Community
  • 1
  • 1
joanne
  • 11
  • 2
  • 3

4 Answers4

1

Function you pass to flatMap:

lambda x:[((x[i],x[i+1]) for i in range(0,len(x)-1))]

outputs a list with a single element which is enclosed generator expression. flatMap flattens an external list and what is left is a RDD of generators. Just drop the external list:

words.flatMap(lambda x:((x[i],x[i+1]) for i in range(0,len(x)-1)))

or even better use zip

words.flatMap(lambda xs: zip(xs, xs[1:])
zero323
  • 322,348
  • 103
  • 959
  • 935
1

The following is my sample code.

from __future__ import print_function

import sys
from operator import add
from pyspark import SparkContext

def split(line):
    words = line.split(" ")
    return [(words[i], words[i+1]) for i in range(len(words)-1)]

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: bigram <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext()
    lines = sc.textFile(sys.argv[1], 1)
    sentences = lines.glom() \
              .map(lambda x: " ".join(x)) \
              .flatMap(lambda x: x.split("."))

    bi_counts = sentences.flatMap(lambda line: split(line))\
        .map(lambda x: (x, 1))\
        .reduceByKey(add)

    bi_counts.saveAsTextFile("bigram_count.out")
    sc.stop()

HTH

Jun
  • 371
  • 2
  • 7
1

Ngram feature is already implemented in library pyspark.ml, easy to use, and efficient.

Example can be found here. It is part of the Features sub-package; below is an example of how to use it:

from pyspark.ml.feature import NGram
from pyspark.sql import Row
df = spark.createDataFrame([Row(tokens='The brown fox jumped over the white fence'.split())])
ngram = NGram(n=2, inputCol="tokens", outputCol="bigrams")
df = ngram.transform(df)

The resulting DataFrame (df) will contain a new column name bigrams of type Array(String) with the bigram induced from the input column tokens.

Elior Malul
  • 683
  • 6
  • 8
0

Python seems to be storing the generator expressions as variables in the line:

bigrams = words.flatMap(lambda x:[((x[i],x[i+1]) for i in range(0,len(x)-1))])

You probably just need to replace this with something like:

bigrams = words.flatMap( lambda x:list((x[i],x[i+1]) for i in range(0,len(x)-1)) )

see here for a more in-depth explanation.

Community
  • 1
  • 1
Jacob H
  • 864
  • 1
  • 10
  • 25