1

I'm trying to use UDF with input type Array of struct. For example, let's say I have the following structure of data. This would all come from a single column from a table, from a single row.

[
  {
    "id": { "value": "23tsdag"},
    "parser": { }
    "probability: 1
  },
  {
    "id": { "value": "ysadoghues"},
    "parser": { }
    "probability: .98
  },
  {
    "id": { "value": "ds8galiusgh4"},
    "parser": { }
    "probability: .7
  },
  ...
  ...
  ...
  {
    "id": { "value": "28sh32ds"},
    "parser": { }
    "probability: .3
  }
]

For my JAVA UDF, I want to read this in as a Seq<Row> (since according to Spark SQL UDF with complex input parameter it says that "... struct types are converted to o.a.s.sql.Row ... data will be exposed as Seq[Row])".)

Therefore, this is my JAVA Code:

public class MyUdf implements UDF1<Seq<Row>, String> {

    public String call(Seq<Row> sequence) throws Exception {
        ...
        ...
        ...
        return "Some String";
    }
}

How can I test this piece of code? Specifically, I've been trying to read json from a file, turn it into a Dataset<Row>, turn that into a List<Row>, and then turn that into Seq<Row>, then pass it as parameter into my UDF as follows:

    @Test
    public void testMyUdf() throws Exception {
        sqlCtx.udf().registerJava("my_udf", MyUdf.class, DataTypes.StringType);
        String filePath = "sample_1.json";
        Dataset<Row> ds = spark.read().option("multiline", "true").json(filePath);
        List<Row> list = ds.collectAsList();
        Seq<Row> sequence = JavaConverters.collectionAsScalaIterableConverter(list).asScala().toSeq();
        sqlCtx.sql( "select my_udf(" + sequence + ")").show();

        ...
        ...
        assertEquals(...)
    }

However, when I do this, I keep getting errors such as this:

org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input '(' expecting {')', ','}(line 1, pos 52)

== SQL ==
select my_udf(Stream([[ABC/42gadsgy5wsdga==],.....
--------------------^^^

Am I doing something wrong? I've been stuck on this all day and any pointers/tips/help would be greatly appreciated. Thank you.

The whole point of me doing this is so that my UDF can take in a Seq<Row> as described in Spark SQL UDF with complex input parameter. Is this even the right approach?

I wanted to be as generic as possible by using Rows instead of having specific classes (because the input contents may be vastly different)

Ethan Yim
  • 85
  • 1
  • 6

1 Answers1

0

With your udf registered you may use it in a spark sql expression. Your dataset should be queryable. To achieve this you may create a temporary view of your dataset.

Let me know if the following works for you:

    @Test
    public void testMyUdf() throws Exception {
        sqlCtx.udf().registerJava("my_udf", MyUdf.class, DataTypes.StringType);
        String filePath = "sample_1.json";
        Dataset<Row> ds = spark.read().option("multiline", "true").json(filePath);
       
        ds.createOrReplaceTempView("my_dataset")

        ds.printSchema(); //this line may be helpful to determine which columns are available
        //I am assuming investments is a column in your dataset/key in each json record of your original dataset
        sqlCtx.sql( "select my_udf(struct(*)) from my_dataset").show();


    }
ggordon
  • 9,790
  • 2
  • 14
  • 27
  • I'm so sorry, I just updated my question with a better example. Can I get some help with the updated example from above? – Ethan Yim Jul 15 '21 at 04:08
  • I am getting "Invalid number of arguments for function hydrate_top_parse_complex_pretty_print. Expected: 1; Found: 3; line 1 pos 7" – Ethan Yim Jul 15 '21 at 04:31
  • I've found a similar question to your update here: https://stackoverflow.com/questions/31816975/how-to-pass-whole-row-to-udf-spark-dataframe-filter . You may have to convert your row into a struct first . I hope this helps – ggordon Jul 15 '21 at 04:36
  • thank you, is there a way to do that in Java? – Ethan Yim Jul 15 '21 at 04:40
  • I've updated the answer again. I'll have to demo locally to provide a more detailed answer – ggordon Jul 15 '21 at 05:06
  • thank you again. When I do it with your updated answer, now I get `Caused by: java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to class scala.collection.Seq` – Ethan Yim Jul 15 '21 at 05:10