0

I am working with data where user info is string. I would like to assign unique integer values to those strings.

I was somewhat following this stack overflow post here. I am using the expression below to have an RDD of tuples:

user = data.map(lambda x:x[0]).distinct().zipWithUniqueId()

After that, I did

data = data.map(lambda x: Rating(int(user.lookup(x[0])), int(x[1]), float(x[2]))) 

What I ultimately want to do is run an ALS model on it, but so far I have been getting this error message

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation.

I think the data type is somehow wrong, but I am not sure how to fix this.

Community
  • 1
  • 1
user2857014
  • 507
  • 3
  • 10
  • 22
  • 1
    There is 2 issues here. The first one is wanting to update values in a DataFrame, that's impossible ! DataFrame are immutable, you'll have to create a new one from the existing one with the update transformation. Second, you can't nest an RDD inside another RDD transformation. You might consider broadcast variable if your RDD is small. – eliasah Apr 06 '16 at 17:39
  • @eliasah thank you for your input. will newData = data.map(lambda x: Rating(int(user.lookup(x[0])), int(x[1]), float(x[2]))) work, or do I have to do something like df = sqlContext.createDataFrame(?, [cols]), where I am not quite sure how to put stuff in the place of ?. As for the second part, where am I nesting an RDD inside another RDD transformation? My data is quite big actually. – user2857014 Apr 06 '16 at 17:47
  • 1
    That may work, you'll need to try ! I can't read code in comment. As for the second part, your user value is an RDD. So here is where you are try to nest RDD. – eliasah Apr 06 '16 at 17:51

1 Answers1

1

lookup approach suggested in the linked answer is simply invalid. Spark doesn't support nested action nor transformations so you cannot call RDD.lookup inside a map. If data is to large to be handled using a standard Python dict for lookups you can simply join and reshape:

from operator import itemgetter
from pyspark.mllib.recommendation import Rating

data = sc.parallelize([("foo", 1, 2.0), ("bar", 2, 3.0)])

user = itemgetter(0)

def to_rating(record):
    """
    >>> to_rating((("foobar", 99, 5.0), 1000))
    Rating(user=1000, product=99, rating=5.0)
    """
    (_, item, rating), user = record
    return Rating(user, item, rating)

user_lookup = data.map(user).distinct().zipWithIndex()

ratings = (data
    .keyBy(user)  # Add user string as a key
    .join(user_lookup)  # Join with lookup
    .values()  # Drop keys
    .map(to_rating))  # Create Ratings

ratings.first()
## Rating(user=1, product=1, rating=2.0)
zero323
  • 322,348
  • 103
  • 959
  • 935