0
animals_population_file = sc.textFile("input/myFile1.txt")
animals_place_file = sc.textFile("input/myFile2.txt")

animals_population_file:

Dogs, 5
Cats, 6

animals_place_file:

Dogs, Italy
Cats, Italy
Dogs, Spain

Now I want to join animals_population_file and animals_place_file using the type of animals as a key. The result should be this one:

Dogs, [Italy, Spain, 5]
Cats, [Italy, 6]

I tried joined = animals_population_file.join(animals_place_file), but I don't know how to define the key. Also, when I run joined.collect(), it gives me an error:

    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o247.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 29, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1807, in <lambda>
    map_values_fn = lambda (k, v): (k, f(v))
ValueError: too many values to unpack
Klausos Klausos
  • 15,308
  • 51
  • 135
  • 217
  • @zero323: I tried solutions provided in the mentioned thread. However, my problem is that I don't know how to point the key in this code df1.join(df2, df1.k == df2.k, joinType='inner'). Also, if I just do df1.join(df2), then collect() gives an error. It is not explained in the pointed thread. – Klausos Klausos Nov 22 '15 at 15:55
  • What you've shown above is an exact content of your files? – zero323 Nov 22 '15 at 15:58
  • @zero323: Exact content: RDD1 = [u'Surreal_Games', u'269', u'Hourly_Games', u'428', u'Hot_Talking', u'747', u'Almost_Sports', u'350'] and another RDD2 = [ u'Loud_Games', u'BAT', u'Cold_Talking', u'DEF', u'Surreal_Sports', u'XYZ', u'Hourly_Sports', u'CAB', u'Hot_Talking', u'MAN', u'Almost_Cooking', u'BAT'] – Klausos Klausos Nov 22 '15 at 16:06
  • No internal tuples or lists? Like `[(u'Surreal_Games', u'269'), ...]`. – zero323 Nov 22 '15 at 16:14
  • @zero323: Yes, what I putted is the exact copy-paste from RDD1.collect() and RDD2.collect() – Klausos Klausos Nov 22 '15 at 16:21
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/95854/discussion-between-zero323-and-klausos-klausos). – zero323 Nov 22 '15 at 16:24

1 Answers1

2

You don't have PairRdd when you run textFile (basing on your rdds content from comment). To make joins, you need the PairRDD however. So turn your inputs into pairRDDs

val rdd1 = sc.textFile("input/myFile1.txt")
val rdd2 = sc.textFile("input/myFile2.txt")

val data1 = rdd1.map(line => line.split(",").map(elem => elem.trim))
val data2 = rdd2.map(line => line.split(",").map(elem => elem.trim))

val pairRdd1 = data1.map(r => (r(0), r))  /** zero index is the animal type which is the key in file 1*/
val pairRdd2 = data2.map(r => (r(0), r))  /** zero index is the animal type which is the key in file 2 as well */

val joined = pairRdd1.join(pairRdd2)

val local = joined.collect()
local.foreach{case (k, v) => {
  print(k + " : ")
  println(v._1.mkString("|") + "|" + v._2.mkString("|"))
}}
Elena Viter
  • 514
  • 6
  • 12