1

I just started learning Spark Programming and Python programming: Can you please help me understand my mistake in my code:

I am running code in Jupyter Notebooks, interactive mode.

  1. The below test code is working fine, where I tested the concept:

     rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))])
    
    result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11])))
    print (result.top(3))
    

    Output:

    [('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]
    
  2. Below also works fine:

    #[(movieid, genre_list)]
    
    aggregateRDD = movieRDD.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1])))
    print (aggregateRDD.top(3))
    

    Output:

    [(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
    
  3. But, when I use a similar concept in my program, it doesn't accept. What am I doing wrong:

    ##############################################################################
    ### Analysis of Movie Ratings percentages across Occupation and Movie Genre
    ##############################################################################
    
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("popularMovie")
    sc = SparkContext(conf =conf)
    
    ###import movie ratings into RDD
    ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
    ###import user details into RDD
    userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
    ###import movie data into RDD
    movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
    ###import genre data into RDD
    genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")
    
    ###split on delimiter functions
    def splitRatingTab(line):
        line = line.split('\t')
        return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
    def splitUserPipe(line):
        line = line.split('|')
        return (int(line[0]), line[3]) #(user, occupation)
    def splitMoviePipe(line):
        line = line.split('|')
        return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])
    
    
    def listToIntElements(lst):
        """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
        for cnt, _ in enumerate(lst):
            lst[cnt] = int(_)
        return lst
    
    ###create dictionary object for genreid and genre
    def loadMovieGenre():
        """
        create dictionary object for genreid and genre
        """
        genre = {}
        with open('C:/SparkCourse/ml-100k/u.genre') as file:
            for line in file:
                #each line is of type [genere, genreid]
                line = line.split('|')
                #convert genreid to int, to remove new line '\n' at the end of string
                genre[int(line[1])] = line[0]
            return genre
    
    
    ### Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
    ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
    
    ### Transform to RDD as [(user, occupation)]
    occupationRDD = userLines.map(splitUserPipe)
    
    ### Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
    movieRDD = movieLines.map(splitMoviePipe)
    
    ###join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
    ###then Transform to [(movieid,((userid, rating), genre) )]
    joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)
    
    
    ###Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
    ###to Transform to [(occupation, ((1, genre)))]
    transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
    joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))
    print (joinRatingGenresOccup.take(2))
    
    
    ###Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]
    totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))
    print (totalRatingGenreCntByOccupation.take(2))
    

    Error:

    [('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]
    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-137-a156d8bbfde9> in <module>()
    ----> 1 get_ipython().run_cell_magic('time', '', '\n##############################################################################\n### Analysis of Movie Ratings percentages across Occupation and Movie Genre\n##############################################################################\n\n#import movie ratings into RDD\nratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")\n#import user details into RDD\nuserLines = sc.textFile("///SparkCourse/ml-100k/u.user")\n#import movie data into RDD\nmovieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")\n#import genre data into RDD\ngenreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")\n\n#split on delimiter functions\ndef splitRatingTab(line):\n    line = line.split(\'\\t\')\n    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)\ndef splitUserPipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), line[3]) #(user, occupation)\ndef splitMoviePipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])\n\n\ndef listToIntElements(lst):\n    """conver the boolean text (\'0\', \'1\') genre value to integers (0, 1)"""\n    for cnt, _ in enumerate(lst):\n        lst[cnt] = int(_)\n    return lst\n\n#create dictionary object for genreid and genre\ndef loadMovieGenre():\n    """\n    create dictionary object for genreid and genre\n    """\n    genre = {}\n    with open(\'C:/SparkCourse/ml-100k/u.genre\') as file:\n        for line in file:\n            #each line is of type [genere, genreid]\n            line = line.split(\'|\')\n            #convert genreid to int, to remove new line \'\\n\' at the end of string\n            genre[int(line[1])] = line[0]\n        return genre\n\n    \n# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers\nratingRDD = ratingLines.map(lambda line: splitRatingTab(line))\n#print (\'ratingRDD:\\n\',ratingRDD.top(5))\n\n# Transform to RDD as [(user, occupation)]\noccupationRDD = userLines.map(splitUserPipe)\n#print (\'occupationRDD:\\n\',occupationRDD.top(3))\n\n# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres\nmovieRDD = movieLines.map(splitMoviePipe)\n#print (\'movieRDD:\\n\',movieRDD.top(3))\n\n#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; \n#then Transform to [(movieid,((userid, rating), genre) )]\njoinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)\n#print (joinRatingMovieGenres.take(2))\n\n#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]\n#to Transform to [(occupation, ((1, genre)))]\ntransRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))\njoinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))\nprint (joinRatingGenresOccup.take(2))\n\n\n#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]\ntotalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))\nprint (totalRatingGenreCntByOccupation.take(2))')
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
       2113             magic_arg_s = self.var_expand(line, stack_depth)
       2114             with self.builtin_trap:
    -> 2115                 result = fn(magic_arg_s, cell)
       2116             return result
       2117 
    
    <decorator-gen-60> in time(self, line, cell, local_ns)
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
        186     # but it's overkill for just that one bit of state.
        187     def magic_deco(arg):
    --> 188         call = lambda f, *a, **k: f(*a, **k)
        189 
        190         if callable(arg):
    
    C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
       1183         else:
       1184             st = clock2()
    -> 1185             exec(code, glob, local_ns)
       1186             end = clock2()
       1187             out = None
    
    <timed exec> in <module>()
    
    C:\spark\python\pyspark\rdd.py in take(self, num)
       1356 
       1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
    -> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
       1359 
       1360             items += res
    
    C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
        999         # SparkContext#runJob.
       1000         mappedRDD = rdd.mapPartitions(partitionFunc)
    -> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
       1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
       1003 
    
    C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
       1158         answer = self.gateway_client.send_command(command)
       1159         return_value = get_return_value(
    -> 1160             answer, self.gateway_client, self.target_id, self.name)
       1161 
       1162         for temp_arg in temp_args:
    
    C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
         61     def deco(*a, **kw):
         62         try:
    ---> 63             return f(*a, **kw)
         64         except py4j.protocol.Py4JJavaError as e:
         65             s = e.java_exception.toString()
    
    C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
        318                 raise Py4JJavaError(
        319                     "An error occurred while calling {0}{1}{2}.\n".
    --> 320                     format(target_id, ".", name), value)
        321             else:
        322                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 239.0 failed 1 times, most recent failure: Lost task 1.0 in stage 239.0 (TID 447, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 362, in func
        return f(iterator)
      File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
        merger.mergeValues(iterator)
      File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
        d[k] = comb(d[k], v) if k in d else creator(v)
      File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
        at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
        at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
        at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
      File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
        return func(split, prev_func(split, iterator))
      File "C:\spark\python\pyspark\rdd.py", line 362, in func
        return f(iterator)
      File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
        merger.mergeValues(iterator)
      File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
        d[k] = comb(d[k], v) if k in d else creator(v)
      File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more 
    
Community
  • 1
  • 1
Vinod
  • 13
  • 5

1 Answers1

0

When you reduceByKey you have to return the same structure you have received, otherwise the next time you will meet a value of the same key and will try to reduce it your function will not work.

You have tested only on two elements so you haven't seen it, but if you'll try with 3...:

 rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
                      ('librarian', (1, [0, 1, 0, 0]))])

result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]),\
                            (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))

..... File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1784, in _mergeCombiners merger.mergeCombiners(iterator) File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py", line 272, in mergeCombiners d[k] = comb(d[k], v) if k in d else v File "", line 3, in TypeError: 'int' object has no attribute 'getitem' ......

The right way to do the reduceByKey in your code is to return the same tuple with a value and a list of the same size:

 rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
                      ('librarian', (1, [0, 1, 0, 0]))])

result = rdd.reduceByKey(lambda x, y: ( x[0] + y[0],\
                            [x[1][0]+y[1][0], x[1][1]+y[1][1], x[1][2]+y[1][2], x[1][3]+y[1][3] ] ))
print (result.collect())

[('librarian', (3, [0, 2, 1, 0]))]

You can also do a combineByKey like explained here: `combineByKey`, pyspark

Also note that this (In the second to last line): "(x[1][2]+y[1][12])" seems like a typo.

user3689574
  • 1,596
  • 1
  • 11
  • 20