0

I had a great problem when I needed to pass a Java hashmap to an UDF, which has been defined as a separate class itself, not some inline lambda function, which can access the enclosing scope's variables defined as broadcast variables. I had started this question here also for this purpose:

How do I pass Spark broadcast variable to a UDF in Java?

No satisfactory answer was being provided as people only provided me answers which contained simple UDFs which can be defined as small lambdas and can thus access broadcast variables from the driver program.

I started investigating typedlits then as I have detailed in the other question, and it seemed to me to be the way forward, but almost nothing in the way of documentation exists for this method in Java, though examples and tutorials exist for the same in Scala. My question therefore is how to pass a complex variable's value to an UDF using typedlit?

supriyo_basak
  • 505
  • 1
  • 7
  • 24

1 Answers1

2

I arrived at the answer to this question via a long, difficult path and am posting this here as a help to anyone else who may face this same problem.

The official Spark Javadocs here gives the typedLit method definition as follows:

typedLit(T literal, scala.reflect.api.TypeTags.TypeTag<T> evidence$1)

It's almost nowhere given how to use this method in Java, and finally, I chanced upon this question:

How to get the TypeTag for a class in Java

Here, we get how we can create a custom Scala object for our desired class which we want to send to an UDF. Using the answer, I created my custom Scala object for a Scala Map:

import scala.reflect.runtime.universe._
import scala.collection.convert._
object TypeTags {
  val MapString = typeTag[scala.collection.Map[String, String]]
}

In order to use this object in my Java Maven project, I followed the structure as given by this blog:

https://dzone.com/articles/scala-in-java-maven-project

The dependency which I had to include in my pom is as follows:

<dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.7</version>
    </dependency>

However, the Scala lifecycle tags which were present in the pom there were not compiling for me. Ths was the initial pom snippet:

<plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
            <execution>
                <id>scala-compile-first</id>
                <phase>process-resources</phase>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>scala-test-compile</id>
                <phase>process-test-resources</phase>
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>

Then I found this question which contains a separate set of lifecycle tags:

My mixed Scala/Java Maven project doesn't compile

Also, I downloaded the mixed Java/Scala project given by the following link:

https://github.com/danyaljj/sampleMixedScalaJavaMavenProject/blob/master/pom.xml

The pom from this project finally worked for me and I could get ahead of the compilation problem due to lifecycle tags. The new pom snippet is as follows:

      <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <id>compile</id>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                    <phase>compile</phase>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                    <phase>test-compile</phase>
                </execution>
                <execution>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

There were then a lot of compilation errors where I was trying to use the TypeTag which I defined in my main class. Finally, I used the answer to this question for my purpose:

Convert Java Map to Scala Map

First, I had to manually import in my main class the Scala object which I had defined in the file TypeTags.scala:

import com.esrx.dqm.datasync.TypeTags$;

I defined a dummy Map to send over to my UDF:

 Map<String, String> testMap = new HashMap<>();
 testMap.put("1", "One");

Then I converted the hashmap to a Scala map:

List<Tuple2<String, String>> tuples = testMap.entrySet().stream()
            .map(e -> Tuple2.apply(e.getKey(), e.getValue()))
            .collect(Collectors.toList());

scala.collection.Map scalaMap = scala.collection.Map$.MODULE$.apply(JavaConversions.asScalaBuffer(tuples).toSeq());

Then I sent the map over to my UDF which I had defined earlier:

TypeTags$ type = TypeTags$.MODULE$;
data = data.withColumn("broadcast", functions.callUDF("TestUDF", functions.typedLit(scalaMap, type.MapString())));

I was not being able to send the MapString val to the UDF as the compiler was always complaining that it has private access in TypeDefs. From the link here, I found that in Java, the vals are accessed by a method invocation like getters and not directly the val itself.

The TestUDF I defined as follows:

public class TestUDF implements UDF1<scala.collection.immutable.Map<String, String>,String> {

@Override
public String call(scala.collection.immutable.Map<String, String> t1) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t1);
    AsJava<Map<String, String>> asJavaMap = JavaConverters.mapAsJavaMapConverter(t1);
    Map<String, String> javaMap = asJavaMap.asJava();
    System.out.println("Value of 1: " + javaMap.get("1"));      
    return null;
}

}

This finally worked and I could access the map from my UDF.

supriyo_basak
  • 505
  • 1
  • 7
  • 24
  • This helped me out big time, thnx. But I was facing issue with incompatibility Map coz of scala version. The UDF receives a `immutable` Map but the `scalaMap` which was defined is a `mutable` so I case in future people face an issue, consider the scenario – Avi Dec 30 '20 at 16:10