0

I want to declare a function to get the cogroup of two RDD. Actually it's a interSectionByKey. The code below can't be compiled:

def getRetain[K, V](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={
    activeUserRdd.cogroup(newUserRdd).flatMapValues{
      x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else  null).asInstanceOf[V])
    }
  }

Error:

value cogroup is not a member of org.apache.spark.rdd.RDD[(K, V)]

I think (K, V) miss matched the real [(K, V)] declared in cogroup, but which is the right way to declare in my function?

Orest Hera
  • 6,706
  • 2
  • 21
  • 35

1 Answers1

0

Apply ClassTag's to your input types to ensure that erased types K and V will be accessible at runtime. This is due to type erasure in Scala.

scala> import scala.reflect.ClassTag
import scala.reflect.ClassTag

scala> def getRetain[K : ClassTag, V : ClassTag](activeUserRdd : RDD[(K, V)], newUserRdd : RDD[(K, V)]): RDD[(K, V)] ={
 |       activeUserRdd.cogroup(newUserRdd).flatMapValues{
 |         x => Option((if (!x._1.isEmpty && !x._2.isEmpty) x._2.head else  null).asInstanceOf[V])
 |       }
 |     }
 getRetain: [K, V](activeUserRdd: org.apache.spark.rdd.RDD[(K, V)], newUserRdd: org.apache.spark.rdd.RDD[(K, V)])(implicit evidence$1: scala.reflect.ClassTag[K], implicit evidence$2: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[(K, V)]
Community
  • 1
  • 1
Rohan Aletty
  • 2,432
  • 1
  • 14
  • 20
  • i don't have this package scala.reflect.TypeTag, do you know some other solutions just use the original scala. I think it can help me a lot to understand scala. And the error was throwed while compiling. thanks. – yyforever1988 Nov 17 '15 at 05:43
  • Sorry, I mistyped the import. Please try again. – Rohan Aletty Nov 17 '15 at 05:52