1

I have a Scala class that returns the max of an input type IN using a MR job in Apache Spark. This class works fine if I call it from Scala and it works as follows:

// Class
class TypedMax[IN, T](val f: IN => T)(implicit ev$1: T => Ordered[T], ev$2: TypeTag[T])
  extends Aggregator[IN, T, T] {

  override def zero: T = null.asInstanceOf[T]
  override def reduce(b: T, a: IN): T = if (b > f(a)) b else f(a)
  override def merge(b1: T, b2: T): T = if (b1 > b2) b1 else b2
  override def finish(reduction: T): T = reduction
}

// Call function
def max[IN, T](f: IN => T)(implicit ev$1: T => Ordered[T], ev$2: TypeTag[T]): TypedColumn[IN, T] =
    new TypedMax[IN, T](f).toColumn

Now I would like to make this also callable from java, but I am having some difficulty with passing in the implicit parameters. I know implicit parameters can be passed by appending them to the argument list in Java, but the implicit parameters are in Scala. Therefore, I am trying to do the following:

class TypedMax[IN, T](val f: IN => T)(implicit ev$1: T => Ordered[T], ev$2: TypeTag[T])
  extends Aggregator[IN, T, T] {

  override def zero: T = null.asInstanceOf[T]
  override def reduce(b: T, a: IN): T = if (b > f(a)) b else f(a)
  override def merge(b1: T, b2: T): T = if (b1 > b2) b1 else b2
  override def finish(reduction: T): T = reduction

  // Java api support
  def this(f: MapFunction[IN, java.lang.Double]) =
    this(x => f.call(x).asInstanceOf[T])(ev$1: T => Ordered[T], ev$2: TypeTag[T])

Which can then be called from java:

public static <T> TypedColumn<T, Double> max(MapFunction<T, Double> f) {
  return new TypedMax<T, Double>(f).toColumn();
}

I have tried many permutations of passing in the implicits from the auxiliary constructor, using implicity implicit and playing around with commas and parenthesis. However, it always complains that ev$1 and ev$2 are not found. I have to pass in the parameters however, because otherwise it will error:

Error:(135, 5) No implicit view available from T => Ordered[T].
    this(x => f.call(x).asInstanceOf[T])
Error:(135, 5) not enough arguments for constructor TypedMax: (implicit ev$1: T => Ordered[T], implicit ev$2: reflect.runtime.universe.TypeTag[T])org.apache.spark.sql.execution.aggregate.TypedMax[IN,T].
Unspecified value parameters ev$1, ev$2.
    this(x => f.call(x).asInstanceOf[T])

And if I try:

def this(f: MapFunction[IN, T]) =
  this(x => f.call(x))(T => Ordered[T], TypeTag[T])

The result is:

Error:(135, 38) value Ordered of type scala.math.Ordered.type does not take type parameters.
    this(x => f.call(x))(T => Ordered[T], TypeTag[T])
Error:(135, 50) missing argument list for method apply in object TypeTag
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `apply _` or `apply(_,_)` instead of `apply`.
    this(x => f.call(x))(T => Ordered[T], TypeTag[T])

What am I missing / misunderstanding? Thanks!

ScrubS
  • 11
  • 2

1 Answers1

1

T => Ordered[T] does not mean what you think it means, this is creating a Function1 with an argument T and body scala.math.Ordered.apply[T]. Try the following instead:

def this(f: MapFunction[IN, T], e: MapFunction[T, Ordered[T]], tt: TypeTag[T]) =
  this(x => f.call(x))(x => e.call(x), tt)

(I'm not really sure if you are going to be able to materialize a TypeTag[T] from java)

OlivierBlanvillain
  • 7,701
  • 4
  • 32
  • 51
  • Thanks Olivier. You are right: I cannot materialize TypeTag. I am currently trying write a wrapper around it, to circumvent the whole issue, but even that is not working. Do you have any suggestions for that? class TypedMaxJava[IN, T](f: MapFunction[IN, T]) { def typedMax: TypedColumn[IN, T] = { new TypedMax[IN, T](x => f.call(x))(Ordering[T], TypeTag[T]).toColumn } } Isn't sufficient unfortunately – ScrubS May 27 '17 at 21:16
  • There is no clean way around, `TypeTags` are materialized by scalac... If you know in advance the subset of supported types you could have an `object TypeTagForJava { def ttInt = TypeTag[Int]; ... }`. Otherwise you will have to fallback to reflection, which is what Spark uses anyways https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala – OlivierBlanvillain May 27 '17 at 22:17