0

How can I create a generic min() function in Spark which returns a value of the same type as the generic used?

Here's what I have for doubles and strings:

def minDouble(rdd: RDD[Map[String, String]], field: String): Double = {
  rdd.map(row => row(field).toDouble).min()
}

def minString(rdd: RDD[Map[String, String]], field: String): String = {
  rdd.map(row => row(field)).min()
}

How can I use generics to make this a single function? And make it extensible so that I could add other types.

Here's my attempt:

def minGeneric[V : Manifest](rdd: RDD[Map[String, String]], field: String)(implicit ord: Ordering[V]): V = {
  rdd.map(row => cast[V](row(field))).min().get
}

which uses the cast() function from Writing a generic cast function Scala

When I run this, I get a NoSuchElementException: None.get:

Exception in thread "main" java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:313)
        at scala.None$.get(Option.scala:311)
        at SimpleApp$.statMinGeneric(SimpleApp.scala:67)
        at SimpleApp$.main(SimpleApp.scala:34)
        at SimpleApp.main(SimpleApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Why is it returning None here? I am I taking the right approach? Thanks in advance.

Community
  • 1
  • 1
Corey Wu
  • 1,209
  • 1
  • 22
  • 39

1 Answers1

1

If you have an option of passing additional param to your function, you can do following:

def minCasted[A](caster: String => A)(rdd: RDD[Map[String, String]], field: String): A = {
    caster(rdd.map(_(field)).min())
}

def minDouble = minCasted(_.toDouble) _

def minString = minCasted(identity[String]) _
Dmitry Ginzburg
  • 7,391
  • 2
  • 37
  • 48