1

I have a Properties variable which I populate by calling a REST service. I require to pass this list to my UDFs. I thought that a broadcast variable would serve my purpose well since the Properties list can be long. So I created a broadcast variable in my main class:

Properties kp = getApplicationProperties(rootPath);
Broadcast<Properties> brVar = sc.broadcast(kp);

However, what would I send the variable as in my UDFs? I tried to send the literal value using org.apache.spark.sql.functions.lit, but this led to my UDF never being called:

spark.sqlContext().udf().registerJava("MongoInsert", MongoInsert.class.getName(), DataTypes.StringType);
persondatasetwithResid.select(callUDF("MongoInsert", lit(rootPath).cast(DataTypes.StringType),
                            col("value").cast(DataTypes.StringType), col("resourceId").cast(DataTypes.StringType),
                            lit(brVar))).show();


public class MongoInsert implements UDF4<String, String, String, Broadcast<Properties>, String> {
  public String call(String rootPath, String jsonstring, String resourceId, Broadcast<Properties> brVar)
        throws Exception {

    Properties kp = brVar.value();
  }
}

I cannot find any resource which could explain how to pass broadcast variables to UDFs in Java. Please help me.

EDIT

I am getting indications across the web that only column types and literal string types can be passed into UDFs. Is this so? Can other variables like maps, arrays etc. not be passed in? Remember, I am talking of Spark with Java. In Scala, it seems there is a lot more flexibility.

EDIT 2

I am also getting a lot of literature pointing to something called typedLit which actually lets us work with Maps and Arrays, eg. the following question:

How to add a Map column to Spark dataset?

Does this mean that wrapping the variable in lit() won't serve my purpose? I tried wrapping a simple Map variable with typedLit(), but it's giving me a compilation error, saying

"The method typedLit(T, TypeTags.TypeTag<T>) in the type functions is not applicable for the arguments (Map<String,String>)"

And quite predictably, there are a plethora of resources on the web about how to use typedLit in Scala, but next to nothing in Spark with Java.

EDIT 3

I found another question:

how to set and get static variables from spark?

This one also could provide an answer to me as a static variable passed to all the classes could serve my purpose. The answer specifies broadcast variables again, but also specifies closures. Once again, there is no example of usages of such closures in Java, not even in the official Spark documentation! If someone could please show me how to create a closure in Java and pass a variable to UDFs using that, it would greatly help me.

supriyo_basak
  • 505
  • 1
  • 7
  • 24

1 Answers1

0

Here is an example to access broadcast variable at class level rather than being passed in.

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class Test {

    Broadcast<String> broadcastVar = null;

    public UDF1 myudf =  new UDF1<String,String>(){
        @Override
        public String call(String x) {
            return broadcastVar.getValue();
        }
    };

    public void setBroadcastVar(Broadcast<String> broadcastVar){
        this.broadcastVar = broadcastVar;
    }


    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder().master("local").appName("test").getOrCreate();
        JavaSparkContext js = new JavaSparkContext(spark.sparkContext());

        Test t = new Test();
        t.setBroadcastVar(js.broadcast("hellow world"));

        spark.udf().register("myudf",t.myudf,DataTypes.StringType);

       //use the udf

    }

}
Salim
  • 2,046
  • 12
  • 13
  • Thanks for your answer, Salim. However, my UDFs are declared as separate classes of their own with their call methods. This is why I need to pass the broadcast variables into them, somehow. – supriyo_basak Feb 04 '20 at 06:25
  • Hi Salim. I searched for how to send static variables to UDFs, and found this question: https://stackoverflow.com/questions/29685330/how-to-set-and-get-static-variables-from-spark , an answer to which says that static variables defined in the driver aren't visible in executors, and that broadcast variables or closures are the only way. As usual, I am not getting much examples of Spark with Java closures as well. – supriyo_basak Feb 05 '20 at 12:08
  • Have you tried that this way? Define a static variable on a class, populate that variable with a broadcasted value, refere this variable in the udf class. – Salim Feb 05 '20 at 13:59
  • Hi Salim, sorry for late response, but I was pulled into other tasks in the meantime. I copied your class, and as the UDF here has been defined as an anonymous inner class, it was able to access the outer scope's broadcastvar variable. However, I created just the same UDF as a separate class and called the variable from in there, and immediately a compilation error was thrown. I ignored it and tried to run the program, but a Spark exception stacktrace was shown saying Unresolved compilation problem: broadcastVar cannot be resolved – supriyo_basak Mar 03 '20 at 06:17
  • You define the broadcast variable inside the udf class and set it using a settler method. Reuse the instance of the class to call the udf multiple times – Salim Mar 04 '20 at 13:31
  • The variable which I need to send to the UDFs is initialized in the main driver class. The requirement is to send that variable to the UDFs, not the other way around. – supriyo_basak Mar 05 '20 at 06:46
  • Call the setter from the driver , then call udf – Salim Mar 05 '20 at 13:09
  • If I instantiate an object of the UDF and call a setter method there, then it's of no use as in the UDF call statement, no particular object instance of the UDF is passed. Spark instantiates another instance of the UDF itself in the callUDF() method. So the value passed won't be visible in the new instance. – supriyo_basak Mar 10 '20 at 05:55